You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2022/02/11 07:34:00 UTC

[jira] [Updated] (KAFKA-8104) Consumer cannot rejoin to the group after rebalancing

     [ https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Guozhang Wang updated KAFKA-8104:
---------------------------------
    Labels: new-consumer-threading-should-fix  (was: )

> Consumer cannot rejoin to the group after rebalancing
> -----------------------------------------------------
>
>                 Key: KAFKA-8104
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8104
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0
>            Reporter: Gregory Koshelev
>            Assignee: Nikolay Izhikov
>            Priority: Critical
>              Labels: new-consumer-threading-should-fix
>             Fix For: 2.4.0
>
>         Attachments: consumer-rejoin-fail.log
>
>
> TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). See explanation below.
> There are 16 consumers in single process (threads from pool-4-thread-1 to pool-4-thread-16). All of them belong to single consumer group {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired and consumers have got {{CommitFailedException}} as expected:
> {noformat}
> 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
> 	at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
> 	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> {noformat}
> After that, most of them successfully rejoined to the group with generation 10699:
> {noformat}
> 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699
> 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-18]
> ...
> 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699
> 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
> ...
> 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-24]
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
> 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing
> 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation -1
> {noformat}
> But one consumer (pool-4-thread-4) got strange generation -1 (see last log record from above).
> Further log records in attached log file.
> Finally, 15 consumers successfully rejoined. But consumer with thread {{pool-4-thread-4}} didn't rejoin:
> {noformat}
> 2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> 	at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
> 	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> 	at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
> 	at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)}}
> {noformat}
> It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not null and succeeded, but {{ConsumerCoordinator}} cannot perform {{resetJoinGroupFuture()}} due to exception was thrown from {{onJoinComplete()}}:
> {code:java}
>             if (future.succeeded()) {
>                 // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
>                 ByteBuffer memberAssignment = future.value().duplicate();
>                 onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
>                 // We reset the join group future only after the completion callback returns. This ensures
>                 // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
>                 resetJoinGroupFuture();
>                 needsJoinPrepare = true;
>             }
> {code}
> If I understood correctly, the generation was changed to {{NO_GENERATION}} in another thread by one of CoordinatorResponseHandlers.
>  
>  [^consumer-rejoin-fail.log] 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)