You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Pierre-Henri Dezanneau (JIRA)" <ji...@apache.org> on 2018/04/17 13:42:00 UTC
[jira] [Created] (KAFKA-6799) Consumer livelock during consumer
group rebalance
Pierre-Henri Dezanneau created KAFKA-6799:
---------------------------------------------
Summary: Consumer livelock during consumer group rebalance
Key: KAFKA-6799
URL: https://issues.apache.org/jira/browse/KAFKA-6799
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Affects Versions: 1.1.0, 0.11.0.2, 1.0.0
Reporter: Pierre-Henri Dezanneau
We have the following environment:
* 1 kafka cluster with 3 brokers
* 1 topic with 3 partitions
* 1 producer
* 1 consumer group with 3 consumers
From this setup, we remove one broker from the cluster, the hard way, by simply killing it. Quite often, we see that the consumer group is not rebalanced correctly. By that I mean that all 3 consumers stop consuming and get stuck in a loop, forever.
The thread dump shows that the consumer threads aren't blocked, but run forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due to the {{synchonized}} keyword on the calling method. Heartbeat threads are blocked, waiting for the consumer threads to release the lock. This situation prevents all consumers from consuming any more record.
We build a simple project which seems to reliably demonstrate this:
{code:sh}
$ mkdir -p /tmp/sandbox && cd /tmp/sandbox
$ git clone https://github.com/phdezann/helloworld-kafka-livelock
$ cd helloworld-kafka-livelock && ./spin.sh
...
$ livelock detecte
{code}
{code:sh|title=Consumer thread|borderStyle=solid}
"kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
java.lang.Thread.State: RUNNABLE
blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
- locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
- locked <0x2a17> (a sun.nio.ch.Util$3)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:684)
at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
- locked <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
at org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown Source:-1)
at java.lang.Thread.run(Thread.java:748)
{code}
{code:sh|title=Heartbeat thread|borderStyle=solid}
"kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 tid=0x36 nid=NA waiting for monitor entry
java.lang.Thread.State: BLOCKED
waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
at java.lang.Object.wait(Object.java:-1)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)