You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by tao xiao <xi...@gmail.com> on 2015/06/09 13:49:08 UTC

Mirror maker doesn't rebalance after getting ZkNoNodeException

Hi,

I have two mirror makers A and B both subscripting to the same whitelist.
During topic rebalancing one of the mirror maker A encountered
ZkNoNodeException and then stopped all connections. but mirror maker B
didn't pick up the topics that were consumed by A and left some of the
topics unassigned. I think this is due to A not releasing ownership of
those topics. My question is why A didn't release ownership upon receiving
error?

Here is the stack trace

[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4],
exception during rebalance  (kafka.consumer.ZookeeperConsumerConnector)
org.I0Itec.zkclient.exception.ZkNoNodeException:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4
        at
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473)
        at
kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:657)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:629)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:619)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:572)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for
/consumers/test/ids/test_some-ip-1433797157389-247c1fc4
        at
org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
        at
org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 13 more



Here is the last part of the log

[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)

[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)

[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All
connections stopped (kafka.consumer.ConsumerFetcherManager)

[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4],
Cleared all relevant queues for this fetcher
(kafka.consumer.ZookeeperConsumerConnector)

[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
Cleared the data chunks in all the consumer message iterators
(kafka.consumer.ZookeeperConsumerConnector)

[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
Invoking rebalance listener before relasing partition ownerships.
(kafka.consumer.ZookeeperConsumerConnector)


As seen in the log Mirror maker A didn't release ownership and it didn't
attempt to trigger another round of rebalancing either. I checked zk. the
node that was reported missing actually existed and it was created at the
same time the error was thrown.


I use the latest trunk code

Re: Mirror maker doesn't rebalance after getting ZkNoNodeException

Posted by tao xiao <xi...@gmail.com>.
I use commit 9e894aa0173b14d64a900bcf780d6b7809368384 from trunk code

On Wed, 10 Jun 2015 at 01:09 Jiangjie Qin <jq...@linkedin.com.invalid> wrote:

> Which version of MM are you running?
>
> On 6/9/15, 4:49 AM, "tao xiao" <xi...@gmail.com> wrote:
>
> >Hi,
> >
> >I have two mirror makers A and B both subscripting to the same whitelist.
> >During topic rebalancing one of the mirror maker A encountered
> >ZkNoNodeException and then stopped all connections. but mirror maker B
> >didn't pick up the topics that were consumed by A and left some of the
> >topics unassigned. I think this is due to A not releasing ownership of
> >those topics. My question is why A didn't release ownership upon receiving
> >error?
> >
> >Here is the stack trace
> >
> >[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4],
> >exception during rebalance  (kafka.consumer.ZookeeperConsumerConnector)
> >org.I0Itec.zkclient.exception.ZkNoNodeException:
> >org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> >NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4
> >        at
> >org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> >        at
> >org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> >        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> >        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473)
> >        at
> >kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consu
> >mer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperCo
> >nsumerConnector.scala:657)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
> >ncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerCon
> >nector.scala:629)
> >        at
> >scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
> >ncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
> >ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
> >ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
> >        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebal
> >ance(ZookeeperConsumerConnector.scala:619)
> >        at
> >kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run
> >(ZookeeperConsumerConnector.scala:572)
> >Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> >KeeperErrorCode = NoNode for
> >/consumers/test/ids/test_some-ip-1433797157389-247c1fc4
> >        at
> >org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
> >        at
> >org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> >        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
> >        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
> >        at
> >org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> >        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> >        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> >        at
> >org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >        ... 13 more
> >
> >
> >
> >Here is the last part of the log
> >
> >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
> >Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
> >
> >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
> >Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
> >
> >[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All
> >connections stopped (kafka.consumer.ConsumerFetcherManager)
> >
> >[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4],
> >Cleared all relevant queues for this fetcher
> >(kafka.consumer.ZookeeperConsumerConnector)
> >
> >[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
> >Cleared the data chunks in all the consumer message iterators
> >(kafka.consumer.ZookeeperConsumerConnector)
> >
> >[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
> >Invoking rebalance listener before relasing partition ownerships.
> >(kafka.consumer.ZookeeperConsumerConnector)
> >
> >
> >As seen in the log Mirror maker A didn't release ownership and it didn't
> >attempt to trigger another round of rebalancing either. I checked zk. the
> >node that was reported missing actually existed and it was created at the
> >same time the error was thrown.
> >
> >
> >I use the latest trunk code
>
>

Re: Mirror maker doesn't rebalance after getting ZkNoNodeException

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Which version of MM are you running?

On 6/9/15, 4:49 AM, "tao xiao" <xi...@gmail.com> wrote:

>Hi,
>
>I have two mirror makers A and B both subscripting to the same whitelist.
>During topic rebalancing one of the mirror maker A encountered
>ZkNoNodeException and then stopped all connections. but mirror maker B
>didn't pick up the topics that were consumed by A and left some of the
>topics unassigned. I think this is due to A not releasing ownership of
>those topics. My question is why A didn't release ownership upon receiving
>error?
>
>Here is the stack trace
>
>[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4],
>exception during rebalance  (kafka.consumer.ZookeeperConsumerConnector)
>org.I0Itec.zkclient.exception.ZkNoNodeException:
>org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
>NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4
>        at
>org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>        at
>org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473)
>        at
>kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consu
>mer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperCo
>nsumerConnector.scala:657)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerCon
>nector.scala:629)
>        at 
>scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebal
>ance(ZookeeperConsumerConnector.scala:619)
>        at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run
>(ZookeeperConsumerConnector.scala:572)
>Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
>KeeperErrorCode = NoNode for
>/consumers/test/ids/test_some-ip-1433797157389-247c1fc4
>        at
>org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
>        at
>org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
>        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
>        at 
>org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>        at
>org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>        ... 13 more
>
>
>
>Here is the last part of the log
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
>Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
>Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All
>connections stopped (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4],
>Cleared all relevant queues for this fetcher
>(kafka.consumer.ZookeeperConsumerConnector)
>
>[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
>Cleared the data chunks in all the consumer message iterators
>(kafka.consumer.ZookeeperConsumerConnector)
>
>[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
>Invoking rebalance listener before relasing partition ownerships.
>(kafka.consumer.ZookeeperConsumerConnector)
>
>
>As seen in the log Mirror maker A didn't release ownership and it didn't
>attempt to trigger another round of rebalancing either. I checked zk. the
>node that was reported missing actually existed and it was created at the
>same time the error was thrown.
>
>
>I use the latest trunk code