You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2013/08/15 18:28:48 UTC

[jira] [Commented] (KAFKA-1010) Concurrency issue in getCluster() causes rebalance failure and dead consumer

    [ https://issues.apache.org/jira/browse/KAFKA-1010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13741148#comment-13741148 ] 

Guozhang Wang commented on KAFKA-1010:
--------------------------------------

This looks good to me. If the broker ephemeral node disappears after the first Zk Call, another rebalance should be triggered so it is safe to fail-fast this trial.

+1
                
> Concurrency issue in getCluster() causes rebalance failure and dead consumer
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-1010
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1010
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: get_cluster_0_8.patch
>
>
> We're seeing the following stack trace on the consumer when brokers are (forcefully) removed from the cluster:
> Thu Aug 15 05:10:06 GMT 2013 Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids/4
> at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:453)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:452)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:596)
> at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:452)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:394)
> at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:391)
> at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:206)
> at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:77)
> at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:89)
> I'm pretty sure this is due to the following logic in getCluster():
>     val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
>     for (node <- nodes) {
>       val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
>       cluster.add(Broker.createBroker(node.toInt, brokerZKString))
>     }
> which is obviously not safe since the nodes retrieved in the first call may have disappeared by the time we iterate to get the values.
> getCluster() seems to only be used in ZookeeperConsumerConnector.syncedRebalance and in ImportZkOffsets.updateZkOffsets (which doesn't actually look like it is using the values), so the simplest solution may be to just move the getCluster() call into the try block in syncedRebalance and kill the usage in the other call.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira