You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Boyang Chen (Jira)" <ji...@apache.org> on 2020/04/01 04:45:00 UTC

[jira] [Created] (KAFKA-9793) Stream HandleAssignment should guarantee task close

Boyang Chen created KAFKA-9793:
----------------------------------

             Summary: Stream HandleAssignment should guarantee task close
                 Key: KAFKA-9793
                 URL: https://issues.apache.org/jira/browse/KAFKA-9793
             Project: Kafka
          Issue Type: Improvement
            Reporter: Boyang Chen
            Assignee: Boyang Chen


When triggering the `handleAssignment` call, if task preCommit throws, the doom-to-fail task shall not be closed, thus causing a RocksDB metrics recorder re-addition, which is fatal:

 

 

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,668] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] Handle new assignment with:

        New active tasks: [1_0, 0_1, 2_0]

        New standby tasks: []

        Existing active tasks: [0_1, 1_0, 2_0, 3_1]

        Existing standby tasks: [] (org.apache.kafka.streams.processor.internals.TaskManager)

 

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [3_1] Prepared clean close (org.apache.kafka.streams.processor.internals.StreamTask)

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,671] INFO [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [0_1] Prepared task for committing (org.apache.kafka.streams.processor.internals.StreamTask)

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) [2020-03-31 23:50:42,682] ERROR [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] stream-thread [stream-soak-test-714fba71-3f5c-4418-8613-22d7b085949c-StreamThread-3] task [1_0] Failed to flush state store logData10MinuteFinalCount-store:  (org.apache.kafka.streams.processor.internals.ProcessorStateManager)

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic windowed-node-counts for task 1_0 due to:

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

[2020-03-31T16:50:43-07:00] (streams-soak-trunk-eos_soak_i-022f109d75764a250_streamslog) Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.

        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:202)

        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)

        at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1352)

        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)

        at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)

        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:768)

        at org.apache.kafka.clients.producer.internals.Sender.maybeAbortBatches(Sender.java:485)

        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:304)

        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)

        at java.lang.Thread.run(Thread.java:748)

 

The correct solution is to wrap the whole code block by try-catch to avoid unexpected close failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)