You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Aravind Dongara <ad...@yahoo.com.INVALID> on 2018/10/01 16:27:14 UTC

Re: KafkaStreams: ProducerFencedException causing StreamThread death

Thanks Guozhang, Yes we are hitting the same issue.

I see in that ticket that it’s fixed in 1.1.2, 2.0.1, 2.1.0, but I don’t see those versions in here https://kafka.apache.org/downloads on the Kafka download page. How can we get access to any of these versions?

Thanks
Aravind





> On Sep 30, 2018, at 1:17 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Could you take a look at https://issues.apache.org/jira/browse/KAFKA-7284
> and see if you are hitting this?
> 
> Guozhang
> 
> On Fri, Sep 28, 2018 at 5:22 PM, Aravind Dongara <adongara@yahoo.com.invalid
>> wrote:
> 
>> Hi Guozhang
>> 
>> Thanks for your reply.
>> We are using Kafka 1.1.1
>> 
>> Thanks
>> Aravind
>> 
>> 
>>> On Sep 28, 2018, at 4:45 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>> 
>>> Hello Aravind,
>>> 
>>> Which version of Kafka are you currently using? What you described seems
>> to
>>> be fixed in the latest version already, so I want to check if you are
>> using
>>> an older version and if yes, what's the best way to work around it.
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara <
>>> adongara@yahoo.com.invalid> wrote:
>>> 
>>>> 
>>>> During a rebalance triggered by kafka-coordinator-heartbeat-thread
>> losing
>>>> connection to ‘Group coordinator’, we noticed that a stream thread is
>>>> shutting down when it catches a ProducerFencedExcpetion while flushing
>> the
>>>> state store.
>>>> This also causes the stream-state on that node to be stuck in
>>>> ‘REBALANCING’ state, even though the partitions have been rebalanced to
>>>> other threads across nodes.
>>>> During rebalance there seems to be a race condition between flushState
>> on
>>>> one node vs ProducerId creation on other node for the same partition. If
>>>> the flushState is slower than the other it encounters
>>>> ProducerFencedException.
>>>> 
>>>> It would be nice if Kafka streams can handle this exception gracefully
>> and
>>>> not shutdown the thread, so that we don’t end up with uneven number of
>>>> threads across nodes.
>>>> Can you guys please suggest any work arounds for this situation?
>>>> 
>>>> Thanks
>>>> Aravind
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]
>> [o.a.k.c.producer.internals.Sender]
>>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
>>>> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer
>>>> batches due to fatal error
>>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> ProcessorStateManager]
>>>> task [0_55] Failed to flush state store upsert-store:
>>>> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort
>>>> sending since an error caught with a previous record (key
>>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
>>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
>> upsert-store-changelog
>>>> due to Cannot perform send because at least one previous transactional
>> or
>>>> idempotent request has failed with errors.
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:69)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:29)
>>>>       at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.
>>>> putAndMaybeForward(CachingKeyValueStore.java:105)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:142)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:100)
>>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
>>>> flush(ThreadCache.java:127)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AbstractTask.flushState(AbstractTask.java:195)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.flushState(StreamTask.java:339)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask$1.run(StreamTask.java:312)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.commit(StreamTask.java:307)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:440)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:422)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
>>>> because at least one previous transactional or idempotent request has
>>>> failed with errors.
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> failIfNotReadyForSend(TransactionManager.java:279)
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
>>>> KafkaProducer.java:828)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
>>>> KafkaProducer.java:784)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
>>>>       ... 34 common frames omitted
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> AssignedStreamsTasks]
>>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Suspending stream task 0_55 failed due to the following error:
>>>> org.apache.kafka.streams.errors.ProcessorStateException: task [0_55]
>>>> Failed to flush state store upsert-store
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:246)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AbstractTask.flushState(AbstractTask.java:195)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.flushState(StreamTask.java:339)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask$1.run(StreamTask.java:312)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.commit(StreamTask.java:307)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:440)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.suspend(StreamTask.java:422)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>> [0_55]
>>>> Abort sending since an error caught with a previous record (key
>>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
>>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
>> upsert-store-changelog
>>>> due to Cannot perform send because at least one previous transactional
>> or
>>>> idempotent request has failed with errors.
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:69)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
>>>> e.java:29)
>>>>       at org.apache.kafka.streams.state.internals.
>> CachingKeyValueStore.
>>>> putAndMaybeForward(CachingKeyValueStore.java:105)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:142)
>>>>       at org.apache.kafka.streams.state.internals.NamedCache.
>>>> flush(NamedCache.java:100)
>>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
>>>> flush(ThreadCache.java:127)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
>>>>       at org.apache.kafka.streams.state.internals.
>>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
>>>>       ... 21 common frames omitted
>>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send
>>>> because at least one previous transactional or idempotent request has
>>>> failed with errors.
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> failIfNotReadyForSend(TransactionManager.java:279)
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
>>>> KafkaProducer.java:828)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
>>>> KafkaProducer.java:784)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
>>>>       ... 34 common frames omitted
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> [2018-09-26T15:39:54,666Z]  [WARN ]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]  [o.a.k.s.p.i.
>> RecordCollectorImpl]
>>>> task [0_55] Error sending record (key 5a07e803-9323-457c-8e1d-
>> 29c0b0bc0fa9
>>>> value [0, 0, 1, 102, 22, -119, 88, -32, 0, 0, 0, -95, 97, 112, 112, 100,
>>>> 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 101, 49, 100, 52, 102,
>> 56,
>>>> 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 50, 53, 45, 52, 55,
>>>> 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 105, 122, 95,
>> 116,
>>>> 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 50, 52, -62,
>> -79,
>>>> 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, -79, 114,
>> 101,
>>>> 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 55, 101,
>> 56,
>>>> 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100,
>> 45,
>>>> 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97, 108,
>>>> 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115,
>> 101,
>>>> -62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110, 116,
>> 84,
>>>> 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45,
>> 48,
>>>> 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43,
>> 48,
>>>> 48, 48, 48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115, 34,
>> 58,
>>>> 91, 123, 10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101,
>> 115,
>>>> 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54,
>> 84,
>>>> 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34,
>> 44,
>>>> 10, 34, 117, 115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65,
>> 99,
>>>> 99, 111, 117, 110, 116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105,
>> 118,
>>>> 97, 108, 105, 97, 118, 101, 110, 116, 97, 100, 105, 114, 101, 99, 116,
>> 97,
>>>> 115, 97, 45, 97, 48, 113, 51, 52, 48, 48, 48, 48, 48, 100, 106, 115,
>> 118,
>>>> 52, 101, 97, 100, 34, 44, 34, 69, 115, 73, 110, 100, 101, 120, 67, 108,
>>>> 117, 115, 116, 101, 114, 34, 58, 34, 112, 114, 100, 52, 52, 45, 49, 50,
>> 34,
>>>> 10, 125, 44, 10, 34, 116, 105, 101, 114, 34, 58, 34, 97, 112, 105, 95,
>> 112,
>>>> 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 101, 114, 73, 100, 34, 58,
>> 34,
>>>> 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 111, 100, 101, 34, 58,
>> 34,
>>>> 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 100, 52, 53, 50,
>> 34,
>>>> 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 52, 54, 57, 54,
>>>> 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 120, 112,
>>>> 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76,
>> 34,
>>>> 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116,
>> 114,
>>>> 117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109, 101,
>>>> 110, 116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99,
>> 116,
>>>> 105, 111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10,
>> 34,
>>>> 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112,
>> 114,
>>>> 100, 52, 52, 45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34,
>> 97,
>>>> 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 53,
>>>> 49, 48, 54, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73,
>> 68,
>>>> 34, 58, 34, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52,
>>>> 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99,
>> 48,
>>>> 102, 97, 57, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111,
>>>> 110, 78, 97, 109, 101, 34, 58, 34, 97, 112, 105, 95, 118, 51, 46, 92,
>> 47,
>>>> 118, 51, 92, 47, 101, 118, 101, 110, 116, 115, 92, 47, 66, 114, 111,
>> 119,
>>>> 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 47, 101, 118, 101, 110,
>> 116,
>>>> 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 73, 100,
>>>> 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 1537976391822) to
>> topic
>>>> upsert-merger-stream-oa43-1-upsert-store-changelog due to Producer
>>>> attempted an operation with an old epoch. Either there is a newer
>> producer
>>>> with the same transactionalId, or the producer's transaction has been
>>>> expired by the broker.; No more records will be sent and no more offsets
>>>> will be recorded for this task.
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,784Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
>> AssignedStreamsTasks]
>>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> After suspending failed, closing the same stream task 0_55 failed again
>> due
>>>> to the following error:
>>>> org.apache.kafka.common.KafkaException: Cannot execute transactional
>>>> method because we are in an error state
>>>>       at org.apache.kafka.clients.producer.internals.
>> TransactionManager.
>>>> maybeFailWithError(TransactionManager.java:784)
>>>>       at org.apache.kafka.clients.producer.internals.
>>>> TransactionManager.beginAbort(TransactionManager.java:229)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.
>>>> abortTransaction(KafkaProducer.java:660)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.closeSuspended(StreamTask.java:493)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamTask.close(StreamTask.java:553)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspendTasks(AssignedTasks.java:192)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> AssignedTasks.suspend(AssignedTasks.java:147)
>>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
>>>> suspendTasksAndState(TaskManager.java:242)
>>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
>>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>       at org.apache.kafka.clients.consumer.internals.
>>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>>> pollOnce(KafkaConsumer.java:1149)
>>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>> KafkaConsumer.java:1115)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.pollRequests(StreamThread.java:831)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runOnce(StreamThread.java:788)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.runLoop(StreamThread.java:749)
>>>>       at org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:719)
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a
>> newer
>>>> producer with the same transactionalId, or the producer's transaction
>> has
>>>> been expired by the broker.
>>>> 
>>>> 
>>>> [2018-09-26T15:39:54,801Z]  [ERROR]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Error caught during partition revocation, will abort the current process
>>>> and re-throw at the end of rebalance: stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> failed to suspend stream tasks
>>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> partition revocation took 156 ms.
>>>>       suspended active tasks: [0_55]
>>>>       suspended standby tasks: [0_50]
>>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> AbstractCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> (Re-)joining group
>>>> 
>>>> [2018-09-26T15:39:56,277Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> AbstractCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Successfully joined group with generation 113
>>>> 
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
>> ConsumerCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Setting newly assigned partitions [oa43-1-event-upsert-48]
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15]  [o.a.k.c.c.i.
>> ConsumerCoordinator]
>>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-15-consumer,
>> groupId=upsert-merger-stream-oa43-1]
>>>> Setting newly assigned partitions [oa43-1-event-upsert-65]
>>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
>>>> 
>>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [kafka-producer-network-thread |
>>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>>>> 0a4b8d6753a2-StreamThread-16-0_48-producer]  [o.a.k.c.p.i.
>> TransactionManager]
>>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
>>>> a27a-0a4b8d6753a2-StreamThread-16-0_48-producer,
>>>> transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to 13
>>>> with epoch 81
>>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> partition assignment took 217 ms.
>>>>       current active tasks: [0_48]
>>>>       current standby tasks: [0_49]
>>>>       previous active tasks: [0_55]
>>>> 
>>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
>>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
>>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
>>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
>>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
>> 0a4b8d6753a2-StreamThread-16]
>>>> Shutting down
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Re: KafkaStreams: ProducerFencedException causing StreamThread death

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Aravind,

The tagged versions may not be released yet, we just tag them when the
corresponding fix commit is merged to the branch (e.g. 1.1.x, 2.0.x, etc).
The next release that will include this commit is 2.1.0, whose feature
freeze date is today:

https://mail.google.com/mail/u/0/#search/Dong+Lin/165c0e4c0eb67579

Guozhang


On Mon, Oct 1, 2018 at 9:27 AM, Aravind Dongara <ad...@yahoo.com.invalid>
wrote:

> Thanks Guozhang, Yes we are hitting the same issue.
>
> I see in that ticket that it’s fixed in 1.1.2, 2.0.1, 2.1.0, but I don’t
> see those versions in here https://kafka.apache.org/downloads on the
> Kafka download page. How can we get access to any of these versions?
>
> Thanks
> Aravind
>
>
>
>
>
> > On Sep 30, 2018, at 1:17 PM, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Could you take a look at https://issues.apache.org/
> jira/browse/KAFKA-7284
> > and see if you are hitting this?
> >
> > Guozhang
> >
> > On Fri, Sep 28, 2018 at 5:22 PM, Aravind Dongara
> <adongara@yahoo.com.invalid
> >> wrote:
> >
> >> Hi Guozhang
> >>
> >> Thanks for your reply.
> >> We are using Kafka 1.1.1
> >>
> >> Thanks
> >> Aravind
> >>
> >>
> >>> On Sep 28, 2018, at 4:45 PM, Guozhang Wang <wa...@gmail.com> wrote:
> >>>
> >>> Hello Aravind,
> >>>
> >>> Which version of Kafka are you currently using? What you described
> seems
> >> to
> >>> be fixed in the latest version already, so I want to check if you are
> >> using
> >>> an older version and if yes, what's the best way to work around it.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara <
> >>> adongara@yahoo.com.invalid> wrote:
> >>>
> >>>>
> >>>> During a rebalance triggered by kafka-coordinator-heartbeat-thread
> >> losing
> >>>> connection to ‘Group coordinator’, we noticed that a stream thread is
> >>>> shutting down when it catches a ProducerFencedExcpetion while flushing
> >> the
> >>>> state store.
> >>>> This also causes the stream-state on that node to be stuck in
> >>>> ‘REBALANCING’ state, even though the partitions have been rebalanced
> to
> >>>> other threads across nodes.
> >>>> During rebalance there seems to be a race condition between flushState
> >> on
> >>>> one node vs ProducerId creation on other node for the same partition.
> If
> >>>> the flushState is slower than the other it encounters
> >>>> ProducerFencedException.
> >>>>
> >>>> It would be nice if Kafka streams can handle this exception gracefully
> >> and
> >>>> not shutdown the thread, so that we don’t end up with uneven number of
> >>>> threads across nodes.
> >>>> Can you guys please suggest any work arounds for this situation?
> >>>>
> >>>> Thanks
> >>>> Aravind
> >>>>
> >>>>
> >>>> [2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread |
> >>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]
> >> [o.a.k.c.producer.internals.Sender]
> >>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
> >>>> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer
> >>>> batches due to fatal error
> >>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
> >> ProcessorStateManager]
> >>>> task [0_55] Failed to flush state store upsert-store:
> >>>> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort
> >>>> sending since an error caught with a previous record (key
> >>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
> >>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
> >> upsert-store-changelog
> >>>> due to Cannot perform send because at least one previous transactional
> >> or
> >>>> idempotent request has failed with errors.
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> >>>> e.java:69)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> >>>> e.java:29)
> >>>>       at org.apache.kafka.streams.state.internals.
> >> CachingKeyValueStore.
> >>>> putAndMaybeForward(CachingKeyValueStore.java:105)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> >>>>       at org.apache.kafka.streams.state.internals.NamedCache.
> >>>> flush(NamedCache.java:142)
> >>>>       at org.apache.kafka.streams.state.internals.NamedCache.
> >>>> flush(NamedCache.java:100)
> >>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
> >>>> flush(ThreadCache.java:127)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AbstractTask.flushState(AbstractTask.java:195)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.flushState(StreamTask.java:339)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask$1.run(StreamTask.java:312)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.commit(StreamTask.java:307)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.suspend(StreamTask.java:440)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.suspend(StreamTask.java:422)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspend(AssignedTasks.java:147)
> >>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
> >>>> suspendTasksAndState(TaskManager.java:242)
> >>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
> >>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>>> pollOnce(KafkaConsumer.java:1149)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>>> KafkaConsumer.java:1115)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.pollRequests(StreamThread.java:831)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runOnce(StreamThread.java:788)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runLoop(StreamThread.java:749)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.run(StreamThread.java:719)
> >>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform
> send
> >>>> because at least one previous transactional or idempotent request has
> >>>> failed with errors.
> >>>>       at org.apache.kafka.clients.producer.internals.
> >> TransactionManager.
> >>>> failIfNotReadyForSend(TransactionManager.java:279)
> >>>>       at org.apache.kafka.clients.producer.internals.
> >> TransactionManager.
> >>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
> >>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
> >>>> KafkaProducer.java:828)
> >>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
> >>>> KafkaProducer.java:784)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
> >>>>       ... 34 common frames omitted
> >>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> >>>> Producer attempted an operation with an old epoch. Either there is a
> >> newer
> >>>> producer with the same transactionalId, or the producer's transaction
> >> has
> >>>> been expired by the broker.
> >>>> [2018-09-26T15:39:54,665Z]  [ERROR]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
> >> AssignedStreamsTasks]
> >>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> Suspending stream task 0_55 failed due to the following error:
> >>>> org.apache.kafka.streams.errors.ProcessorStateException: task [0_55]
> >>>> Failed to flush state store upsert-store
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> ProcessorStateManager.flush(ProcessorStateManager.java:246)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AbstractTask.flushState(AbstractTask.java:195)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.flushState(StreamTask.java:339)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask$1.run(StreamTask.java:312)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.commit(StreamTask.java:307)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.suspend(StreamTask.java:440)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.suspend(StreamTask.java:422)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspendTasks(AssignedTasks.java:182)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspend(AssignedTasks.java:147)
> >>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
> >>>> suspendTasksAndState(TaskManager.java:242)
> >>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
> >>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>>> pollOnce(KafkaConsumer.java:1149)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>>> KafkaConsumer.java:1115)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.pollRequests(StreamThread.java:831)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runOnce(StreamThread.java:788)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runLoop(StreamThread.java:749)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.run(StreamThread.java:719)
> >>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >> [0_55]
> >>>> Abort sending since an error caught with a previous record (key
> >>>> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp
> >>>> 1537976392104) to topic upsert-merger-stream-oa43-1-
> >> upsert-store-changelog
> >>>> due to Cannot perform send because at least one previous transactional
> >> or
> >>>> idempotent request has failed with errors.
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> RecordCollectorImpl.send(RecordCollectorImpl.java:197)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> StoreChangeLogger.logChange(StoreChangeLogger.java:59)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> >>>> e.java:69)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor
> >>>> e.java:29)
> >>>>       at org.apache.kafka.streams.state.internals.
> >> CachingKeyValueStore.
> >>>> putAndMaybeForward(CachingKeyValueStore.java:105)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> >>>>       at org.apache.kafka.streams.state.internals.NamedCache.
> >>>> flush(NamedCache.java:142)
> >>>>       at org.apache.kafka.streams.state.internals.NamedCache.
> >>>> flush(NamedCache.java:100)
> >>>>       at org.apache.kafka.streams.state.internals.ThreadCache.
> >>>> flush(ThreadCache.java:127)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
> >>>>       at org.apache.kafka.streams.state.internals.
> >>>> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> ProcessorStateManager.flush(ProcessorStateManager.java:243)
> >>>>       ... 21 common frames omitted
> >>>> Caused by: org.apache.kafka.common.KafkaException: Cannot perform
> send
> >>>> because at least one previous transactional or idempotent request has
> >>>> failed with errors.
> >>>>       at org.apache.kafka.clients.producer.internals.
> >> TransactionManager.
> >>>> failIfNotReadyForSend(TransactionManager.java:279)
> >>>>       at org.apache.kafka.clients.producer.internals.
> >> TransactionManager.
> >>>> maybeAddPartitionToTransaction(TransactionManager.java:264)
> >>>>       at org.apache.kafka.clients.producer.KafkaProducer.doSend(
> >>>> KafkaProducer.java:828)
> >>>>       at org.apache.kafka.clients.producer.KafkaProducer.send(
> >>>> KafkaProducer.java:784)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> RecordCollectorImpl.send(RecordCollectorImpl.java:153)
> >>>>       ... 34 common frames omitted
> >>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> >>>> Producer attempted an operation with an old epoch. Either there is a
> >> newer
> >>>> producer with the same transactionalId, or the producer's transaction
> >> has
> >>>> been expired by the broker.
> >>>> [2018-09-26T15:39:54,666Z]  [WARN ]  [kafka-producer-network-thread |
> >>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >>>> 0a4b8d6753a2-StreamThread-16-0_55-producer]  [o.a.k.s.p.i.
> >> RecordCollectorImpl]
> >>>> task [0_55] Error sending record (key 5a07e803-9323-457c-8e1d-
> >> 29c0b0bc0fa9
> >>>> value [0, 0, 1, 102, 22, -119, 88, -32, 0, 0, 0, -95, 97, 112, 112,
> 100,
> >>>> 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 101, 49, 100, 52, 102,
> >> 56,
> >>>> 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 50, 53, 45, 52,
> 55,
> >>>> 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 105, 122, 95,
> >> 116,
> >>>> 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 50, 52, -62,
> >> -79,
> >>>> 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, -79, 114,
> >> 101,
> >>>> 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 55,
> 101,
> >> 56,
> >>>> 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100,
> >> 45,
> >>>> 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97,
> 108,
> >>>> 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115,
> >> 101,
> >>>> -62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110,
> 116,
> >> 84,
> >>>> 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45,
> >> 48,
> >>>> 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50,
> 43,
> >> 48,
> >>>> 48, 48, 48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115,
> 34,
> >> 58,
> >>>> 91, 123, 10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101,
> >> 115,
> >>>> 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54,
> >> 84,
> >>>> 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48,
> 34,
> >> 44,
> >>>> 10, 34, 117, 115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65,
> >> 99,
> >>>> 99, 111, 117, 110, 116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105,
> >> 118,
> >>>> 97, 108, 105, 97, 118, 101, 110, 116, 97, 100, 105, 114, 101, 99, 116,
> >> 97,
> >>>> 115, 97, 45, 97, 48, 113, 51, 52, 48, 48, 48, 48, 48, 100, 106, 115,
> >> 118,
> >>>> 52, 101, 97, 100, 34, 44, 34, 69, 115, 73, 110, 100, 101, 120, 67,
> 108,
> >>>> 117, 115, 116, 101, 114, 34, 58, 34, 112, 114, 100, 52, 52, 45, 49,
> 50,
> >> 34,
> >>>> 10, 125, 44, 10, 34, 116, 105, 101, 114, 34, 58, 34, 97, 112, 105, 95,
> >> 112,
> >>>> 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 101, 114, 73, 100, 34, 58,
> >> 34,
> >>>> 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 111, 100, 101, 34,
> 58,
> >> 34,
> >>>> 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 100, 52, 53, 50,
> >> 34,
> >>>> 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 52, 54, 57,
> 54,
> >>>> 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 120,
> 112,
> >>>> 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76,
> >> 34,
> >>>> 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116,
> >> 114,
> >>>> 117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109,
> 101,
> >>>> 110, 116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99,
> >> 116,
> >>>> 105, 111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10,
> >> 34,
> >>>> 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112,
> >> 114,
> >>>> 100, 52, 52, 45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34,
> >> 97,
> >>>> 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34,
> 53,
> >>>> 49, 48, 54, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73,
> >> 68,
> >>>> 34, 58, 34, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45,
> 52,
> >>>> 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99,
> >> 48,
> >>>> 102, 97, 57, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105,
> 111,
> >>>> 110, 78, 97, 109, 101, 34, 58, 34, 97, 112, 105, 95, 118, 51, 46, 92,
> >> 47,
> >>>> 118, 51, 92, 47, 101, 118, 101, 110, 116, 115, 92, 47, 66, 114, 111,
> >> 119,
> >>>> 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 47, 101, 118, 101, 110,
> >> 116,
> >>>> 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 73,
> 100,
> >>>> 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 1537976391822) to
> >> topic
> >>>> upsert-merger-stream-oa43-1-upsert-store-changelog due to Producer
> >>>> attempted an operation with an old epoch. Either there is a newer
> >> producer
> >>>> with the same transactionalId, or the producer's transaction has been
> >>>> expired by the broker.; No more records will be sent and no more
> offsets
> >>>> will be recorded for this task.
> >>>>
> >>>>
> >>>> [2018-09-26T15:39:54,784Z]  [ERROR]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.s.p.i.
> >> AssignedStreamsTasks]
> >>>> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> After suspending failed, closing the same stream task 0_55 failed
> again
> >> due
> >>>> to the following error:
> >>>> org.apache.kafka.common.KafkaException: Cannot execute transactional
> >>>> method because we are in an error state
> >>>>       at org.apache.kafka.clients.producer.internals.
> >> TransactionManager.
> >>>> maybeFailWithError(TransactionManager.java:784)
> >>>>       at org.apache.kafka.clients.producer.internals.
> >>>> TransactionManager.beginAbort(TransactionManager.java:229)
> >>>>       at org.apache.kafka.clients.producer.KafkaProducer.
> >>>> abortTransaction(KafkaProducer.java:660)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.closeSuspended(StreamTask.java:493)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.close(StreamTask.java:553)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspendTasks(AssignedTasks.java:192)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> AssignedTasks.suspend(AssignedTasks.java:147)
> >>>>       at org.apache.kafka.streams.processor.internals.TaskManager.
> >>>> suspendTasksAndState(TaskManager.java:242)
> >>>>       at org.apache.kafka.streams.processor.internals.StreamThread$
> >>>> RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> >>>>       at org.apache.kafka.clients.consumer.internals.
> >>>> ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
> >>>> pollOnce(KafkaConsumer.java:1149)
> >>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >>>> KafkaConsumer.java:1115)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.pollRequests(StreamThread.java:831)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runOnce(StreamThread.java:788)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.runLoop(StreamThread.java:749)
> >>>>       at org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.run(StreamThread.java:719)
> >>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> >>>> Producer attempted an operation with an old epoch. Either there is a
> >> newer
> >>>> producer with the same transactionalId, or the producer's transaction
> >> has
> >>>> been expired by the broker.
> >>>>
> >>>>
> >>>> [2018-09-26T15:39:54,801Z]  [ERROR]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> Error caught during partition revocation, will abort the current
> process
> >>>> and re-throw at the end of rebalance: stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> failed to suspend stream tasks
> >>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> partition revocation took 156 ms.
> >>>>       suspended active tasks: [0_55]
> >>>>       suspended standby tasks: [0_50]
> >>>> [2018-09-26T15:39:54,801Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
> >> AbstractCoordinator]
> >>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
> >> groupId=upsert-merger-stream-oa43-1]
> >>>> (Re-)joining group
> >>>>
> >>>> [2018-09-26T15:39:56,277Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
> >> AbstractCoordinator]
> >>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
> >> groupId=upsert-merger-stream-oa43-1]
> >>>> Successfully joined group with generation 113
> >>>>
> >>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]  [o.a.k.c.c.i.
> >> ConsumerCoordinator]
> >>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-16-consumer,
> >> groupId=upsert-merger-stream-oa43-1]
> >>>> Setting newly assigned partitions [oa43-1-event-upsert-48]
> >>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15]  [o.a.k.c.c.i.
> >> ConsumerCoordinator]
> >>>> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-15-consumer,
> >> groupId=upsert-merger-stream-oa43-1]
> >>>> Setting newly assigned partitions [oa43-1-event-upsert-65]
> >>>> [2018-09-26T15:39:56,278Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> >>>>
> >>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [kafka-producer-network-thread |
> >>>> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >>>> 0a4b8d6753a2-StreamThread-16-0_48-producer]  [o.a.k.c.p.i.
> >> TransactionManager]
> >>>> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-
> >>>> a27a-0a4b8d6753a2-StreamThread-16-0_48-producer,
> >>>> transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to
> 13
> >>>> with epoch 81
> >>>> [2018-09-26T15:39:56,495Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> partition assignment took 217 ms.
> >>>>       current active tasks: [0_48]
> >>>>       current standby tasks: [0_49]
> >>>>       previous active tasks: [0_55]
> >>>>
> >>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
> >>>> [2018-09-26T15:39:56,496Z]  [INFO ]  [upsert-merger-stream-oa43-1-
> >>>> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
> >>>> [o.a.k.s.p.internals.StreamThread]  stream-thread
> >>>> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-
> >> 0a4b8d6753a2-StreamThread-16]
> >>>> Shutting down
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang