You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Attila Sasvari (JIRA)" <ji...@apache.org> on 2018/04/19 11:34:00 UTC

[jira] [Commented] (KAFKA-6799) Consumer livelock during consumer group rebalance

    [ https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921 ] 

Attila Sasvari commented on KAFKA-6799:
---------------------------------------

[~phdezann] thanks for reporting this issue and creating the docker environment for reproducing the issue.

- I had to set the environment variable M2_REPOSITORY before running the shell script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}}
Then I saw the issue you described in the description.
- I looked a bit around in the helloworld-kafka-1 docker container, and noticed that the replication factor for the internal topic was set to 1:
{code}
root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 172.170.0.80:2181
Topic:__consumer_offsets	PartitionCount:50	ReplicationFactor:1	Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
	Topic: __consumer_offsets	Partition: 0	Leader: 103	Replicas: 103	Isr: 103
	Topic: __consumer_offsets	Partition: 1	Leader: 101	Replicas: 101	Isr: 101
	Topic: __consumer_offsets	Partition: 2	Leader: -1	Replicas: 102	Isr: 102
{code}
In this situation, consumer cannot contact the partition leader for  __consumer_offsets, Partition: 2 as it was killed by the test. So it won't be able to commit the offset, for that specific partition.
- I changed replication factor to 3 for {{__consumer_offsets}} and then I did not see this issue.
- Can you add the following to {{docker/entrypoint.sh}} and re-test?
{code}
cat >>config/server.properties <<ENDL


offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2
ENDL
{code}
before 
{code}
# Start kafka
 ./bin/kafka-server-start.sh config/server.properties --override advertised.listeners=${KAFKA_ADVERTISED_LISTENERS} --override broker.id=${KAFKA_BROKER_ID} --override zookeeper.connect=${KAFKA_ZOOKEEPER_CONNECT}
{code}

> Consumer livelock during consumer group rebalance
> -------------------------------------------------
>
>                 Key: KAFKA-6799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6799
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 1.0.0, 0.11.0.2, 1.1.0
>            Reporter: Pierre-Henri Dezanneau
>            Priority: Critical
>
> We have the following environment:
> * 1 kafka cluster with 3 brokers
> * 1 topic with 3 partitions
> * 1 producer
> * 1 consumer group with 3 consumers
> From this setup, we remove one broker from the cluster, the hard way, by simply killing it. Quite often, we see that the consumer group is not rebalanced correctly. By that I mean that all 3 consumers stop consuming and get stuck in a loop, forever.
> The thread dump shows that the consumer threads aren't blocked but run forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due to the {{synchonized}} keyword on the calling method. Heartbeat threads are blocked, waiting for the consumer threads to release the lock. This situation prevents all consumers from consuming any more record.
> We build a simple project which seems to reliably demonstrate this:
> {code:sh}
> $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
> $ git clone https://github.com/phdezann/helloworld-kafka-livelock
> $ cd helloworld-kafka-livelock && ./spin.sh
> ...
> livelock detected
> {code}
> {code:sh|title=Consumer thread|borderStyle=solid}
> "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
> 	 blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
> 	  at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
> 	  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> 	  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> 	  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> 	  - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
> 	  - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
> 	  - locked <0x2a17> (a sun.nio.ch.Util$3)
> 	  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> 	  at org.apache.kafka.common.network.Selector.select(Selector.java:684)
> 	  at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
> 	  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> 	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
> 	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> 	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> 	  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
> 	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
> 	  - locked <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> 	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
> 	  at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)
> 	  at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
> 	  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> 	  at org.helloworld.kafka.bus.HelloWorldKafkaListener.lambda$createConsumerInDedicatedThread$0(HelloWorldKafkaListener.java:45)
> 	  at org.helloworld.kafka.bus.HelloWorldKafkaListener$$Lambda$42.1776656466.run(Unknown Source:-1)
> 	  at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:sh|title=Heartbeat thread|borderStyle=solid}
> "kafka-coordinator-heartbeat-thread | helloWorldGroup@10728" daemon prio=5 tid=0x36 nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
> 	 waiting for kafka-consumer-1@10733 to release lock on <0x2a0c> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> 	  at java.lang.Object.wait(Object.java:-1)
> 	  at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:955)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)