You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Boyang Chen (Jira)" <ji...@apache.org> on 2020/04/01 04:46:00 UTC

[jira] [Updated] (KAFKA-9793) Stream HandleAssignment should guarantee task close

     [ https://issues.apache.org/jira/browse/KAFKA-9793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Boyang Chen updated KAFKA-9793:
-------------------------------
    Issue Type: Bug  (was: Improvement)

> Stream HandleAssignment should guarantee task close
> ---------------------------------------------------
>
>                 Key: KAFKA-9793
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9793
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.6.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> When triggering the `handleAssignment` call, if task preCommit throws, the doom-to-fail task shall not be closed, thus causing a RocksDB metrics recorder re-addition, which is fatal:
>  
>  
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,668] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle new assignment with:
>         New active tasks: [1_0, 0_1, 2_0]
>         New standby tasks: []
>         Existing active tasks: [0_1, 1_0, 2_0, 3_1]
>         Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager)
>  
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [3_1] Prepared clean close (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [0_1] Prepared task for committing (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,682] ERROR [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [1_0] Failed to flush state store logData10MinuteFinalCount-store:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic windowed-node-counts for task 1_0 due to:
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
> [2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202)
>         at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>         at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352)
>         at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768)
>         at org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485)
>         at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304)
>         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.lang.Thread.run(Thread.java:748)
>  
> The correct solution is to wrap the whole code block by try-catch to avoid unexpected close failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)