You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Xavier Léauté (JIRA)" <ji...@apache.org> on 2017/04/14 17:24:41 UTC

[jira] [Created] (KAFKA-5073) Kafka Streams stuck rebalancing after exception thrown in rebalance listener

Xavier Léauté created KAFKA-5073:
------------------------------------

             Summary: Kafka Streams stuck rebalancing after exception thrown in rebalance listener
                 Key: KAFKA-5073
                 URL: https://issues.apache.org/jira/browse/KAFKA-5073
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.11.0.0, 0.10.2.1
            Reporter: Xavier Léauté


An exception thrown in the Steams rebalance listener will cause the Kafka consumer coordinator to log an error, but the streams app will not bubble the exception up to the uncaught exception handler.

This will leave the app stuck in rebalancing state if an exception is thrown by the consumer when starting state restore.

Here is an example log that shows the error when the consumer throws a CRC error during state restore.

{{code}}
[2017-04-13 14:46:41,409] ERROR [XXX-StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group XXXXXXX failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:269)
org.apache.kafka.common.KafkaException: Record batch for partition _my_topic-0 at offset 42 is invalid, cause: Record is corrupt (stored crc = 1982353474, computed crc = 1572524932)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.maybeEnsureValid(Fetcher.java:904)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:936)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:960)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:864)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:517)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1069)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:145)
        at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:1329)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:546)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:702)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
{{code}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)