You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "AD (Jira)" <ji...@apache.org> on 2020/09/16 10:09:00 UTC

[jira] [Created] (KAFKA-10489) Committed consumer offsets not sent to consumer on rebalance

AD created KAFKA-10489:
--------------------------

             Summary: Committed consumer offsets not sent to consumer on rebalance
                 Key: KAFKA-10489
                 URL: https://issues.apache.org/jira/browse/KAFKA-10489
             Project: Kafka
          Issue Type: Bug
          Components: consumer, offset manager
    Affects Versions: 2.1.0
            Reporter: AD


Committed consumer offsets not sent to consumer on rebalance

Hi, I recently ran into an issue that my kafka cluster did not report committed consumer offsets to a client even though they are available in the __consumer_offset topic.
h2. Preparation
h3. Setup
 * Kafka 2.10 (2.12)
 * Three node cluster (one kafka & one zookeeper each)
 * Consumer Application (springboot 2.2.6 (w/ spring-kafka-2.3.7 & kafka-clients-2.3.1))
 * all timestamps are in UTC

h3. Scenario:
 * Topic: topic-0 (one partition, timebased retention: -1)
 * Group: group-0 (one group member only)

h2. Issue

After one node of the cluster was rebooted a rebalance lead to the consumer not getting the offsets stored in the {{__consumer_offsets}} topic but got the default (which in our case is {{latest}}).
 This lead to the consumer missing some data that got produced in the meantime.
h2. Expectation

I would have expected that the consumer receives the last committed offset, in this case {{334389}}.
h2. Logs
h3. Kafka
h4. Server Log
{noformat}
2020-09-10 10:12:11,349 INFO [KafkaServer id=0] Starting controlled shutdown (kafka.server.KafkaServer)
...
2020-09-10 10:12:12,299 INFO [ProducerStateManager partition=consumer-topic-0] Writing producer snapshot at offset 334408 (kafka.log.ProducerStateManager)
...
2020-09-10 10:12:12,601 INFO Shutdown complete. (kafka.log.LogManager)
...
2020-09-10 10:13:31,095 INFO starting (kafka.server.KafkaServer)
{noformat}
h4. Group coordinator
{noformat}
[consumer,topic,0]::OffsetAndMetadata(offset=334389, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732710804, expireTimestamp=None)  // timestamp:  2020-09-10 10:11:50.804
[consumer,topic,0]::OffsetAndMetadata(offset=334408, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732737661, expireTimestamp=None)  // timestamp:  2020-09-10 10:12:17.661
{noformat}
h3. Consumer
h4. Log
{noformat}
Sep 10, 2020 @ 10:12:17.541 [Consumer clientId=consumer-topic-0, groupId=group-0] Successfully joined group with generation 1
Sep 10, 2020 @ 10:12:17.549 [Consumer clientId=consumer-topic-0, groupId=group-0] Setting newly assigned partitions: topic-0
Sep 10, 2020 @ 10:12:17.553 [Consumer clientId=consumer-topic-0, groupId=group-0] Found no committed offset for partition topic-0
Sep 10, 2020 @ 10:12:17.661 [Consumer clientId=consumer-topic-0, groupId=group-0] Resetting offset for partition topic-0 to offset 334408.
{noformat}
h4. Stacktrace
{noformat}
 Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1173)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:955)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2039)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1862)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:983)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:929)
    ... 3 common frames omitted
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)