You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Bhavesh Mistry <mi...@gmail.com> on 2014/11/17 23:30:57 UTC

How to recover from ConsumerRebalanceFailedException ?

Hi Kafka Team,


I get following exception due to ZK/Network issues intermittently.  How do
I recover from consumer thread dying *programmatically* and restart source
because we have alerts that due to this error we have partition OWNERSHIP
is *none* ?  Please let me know how to restart source and detect consumer
thread died and need to be restarted ?



17 Nov 2014 04:29:41,180 ERROR [
ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
(org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
ZkEvent[New session event sent to
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8]
kafka.common.ConsumerRebalanceFailedException:
mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
can't rebalance after 8 retries
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
        at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)





ZK Connection Issues:

java.net.SocketException: Transport endpoint is not connected
        at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
        at
sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
        at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
        at
org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
        at
org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)




        at
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
        at
kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
        at
kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
        at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for
/consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
        at
org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at
org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Guozhang Wang <wa...@gmail.com>.
Bhavesh,

In 0.9 consumer would not talk to ZK and will be single threaded, which
will be easier to provide monitoring mechanisms.

Guozhang

On Thu, Nov 20, 2014 at 8:15 PM, Jun Rao <ju...@gmail.com> wrote:

> Can you just monitor the consumer byte/message/fetch rate?
>
> Thanks,
>
> Jun
>
> On Thu, Nov 20, 2014 at 5:31 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Do you want me to request Jira ticket for feature a notification for new
> > consumer API and old consumer feature that consumer stream is dying.  So
> > application can try to restart it programmatically.  I understand this is
> > due to network or zk cluster instability.
> >
> > Let me know if you have alternative proposal for this for new  and old
> > high-level consumer API.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > ZK cluster are up and running.  What is best way to programmatically
> > > recover and I would try to exponential recovery process which I am
> > willing
> > > to implement.    So do you think monitoring  "ZkClient-EventThread
> > > <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/
> >*"
> > >  thread status will be enough to indicate source thread is dead and
> > > therefore start exponential reconnect process ?
> > >
> > > Can you guys at least for new consumer api (0.9.0) provide a call back
> > > method or notification the consumer is died and reason for it ?
> > >
> > >
> > > Thanks,
> > > Bhavesh
> > >
> > >
> > >
> > > On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> Is your ZK service alive at that point? If not, you just need to
> monitor
> > >> the ZK server properly.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> > >> mistry.p.bhavesh@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Kafka Team,
> > >> >
> > >> >
> > >> > I get following exception due to ZK/Network issues intermittently.
> > How
> > >> do
> > >> > I recover from consumer thread dying *programmatically* and restart
> > >> source
> > >> > because we have alerts that due to this error we have partition
> > >> OWNERSHIP
> > >> > is *none* ?  Please let me know how to restart source and detect
> > >> consumer
> > >> > thread died and need to be restarted ?
> > >> >
> > >> >
> > >> >
> > >> > 17 Nov 2014 04:29:41,180 ERROR [
> > >> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> > >> > dare-msgq01.sv.walmartlabs.com:9091,
> > dare-msgq02.sv.walmartlabs.com:9091
> > >> ]
> > >> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> > >> > ZkEvent[New session event sent to
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> > >> > ]
> > >> > kafka.common.ConsumerRebalanceFailedException:
> > >> >
> > >> >
> > >>
> >
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > >> > can't rebalance after 8 retries
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >> >         at
> > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > ZK Connection Issues:
> > >> >
> > >> > java.net.SocketException: Transport endpoint is not connected
> > >> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> > >> >         at
> > >> >
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> > >> >         at
> > >> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> > >> >         at
> > >> >
> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> > >> >         at
> > >> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >         at
> > >> >
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > >> >         at
> > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > >> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> > >> >         at
> > >> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> > >> >         at
> > >> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > >> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >> >         at
> > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > >> > KeeperErrorCode = NoNode for
> > >> >
> > >> >
> > >>
> >
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > >> >         at
> > >> >
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> > >> >         at
> > >> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> > >> >         at
> org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> > >> >         at
> org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> > >> >         at
> > >> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> > >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> > >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> > >> >         at
> > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > >> >
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Jun Rao <ju...@gmail.com>.
Can you just monitor the consumer byte/message/fetch rate?

Thanks,

Jun

On Thu, Nov 20, 2014 at 5:31 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> HI Jun,
>
> Do you want me to request Jira ticket for feature a notification for new
> consumer API and old consumer feature that consumer stream is dying.  So
> application can try to restart it programmatically.  I understand this is
> due to network or zk cluster instability.
>
> Let me know if you have alternative proposal for this for new  and old
> high-level consumer API.
>
> Thanks,
>
> Bhavesh
>
> On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > ZK cluster are up and running.  What is best way to programmatically
> > recover and I would try to exponential recovery process which I am
> willing
> > to implement.    So do you think monitoring  "ZkClient-EventThread
> > <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
> >  thread status will be enough to indicate source thread is dead and
> > therefore start exponential reconnect process ?
> >
> > Can you guys at least for new consumer api (0.9.0) provide a call back
> > method or notification the consumer is died and reason for it ?
> >
> >
> > Thanks,
> > Bhavesh
> >
> >
> >
> > On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Is your ZK service alive at that point? If not, you just need to monitor
> >> the ZK server properly.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> >> mistry.p.bhavesh@gmail.com>
> >> wrote:
> >>
> >> > Hi Kafka Team,
> >> >
> >> >
> >> > I get following exception due to ZK/Network issues intermittently.
> How
> >> do
> >> > I recover from consumer thread dying *programmatically* and restart
> >> source
> >> > because we have alerts that due to this error we have partition
> >> OWNERSHIP
> >> > is *none* ?  Please let me know how to restart source and detect
> >> consumer
> >> > thread died and need to be restarted ?
> >> >
> >> >
> >> >
> >> > 17 Nov 2014 04:29:41,180 ERROR [
> >> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> >> > dare-msgq01.sv.walmartlabs.com:9091,
> dare-msgq02.sv.walmartlabs.com:9091
> >> ]
> >> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> >> > ZkEvent[New session event sent to
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> >> > ]
> >> > kafka.common.ConsumerRebalanceFailedException:
> >> >
> >> >
> >>
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> >> > can't rebalance after 8 retries
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >> >         at
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > ZK Connection Issues:
> >> >
> >> > java.net.SocketException: Transport endpoint is not connected
> >> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> >> >         at
> >> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> >> >         at
> >> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> >> >         at
> >> >
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> >> >         at
> >> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> >> >
> >> >
> >> >
> >> >
> >> >         at
> >> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> >> >         at
> >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> >> >         at
> >> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> >> >         at
> >> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> >> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> >> >         at
> >> >
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >> >         at
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> >> > KeeperErrorCode = NoNode for
> >> >
> >> >
> >>
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> >> >         at
> >> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> >> >         at
> >> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> >> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> >> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> >> >         at
> >> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> >> >         at
> >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> >
> >>
> >
> >
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Bhavesh Mistry <mi...@gmail.com>.
HI Jun,

Do you want me to request Jira ticket for feature a notification for new
consumer API and old consumer feature that consumer stream is dying.  So
application can try to restart it programmatically.  I understand this is
due to network or zk cluster instability.

Let me know if you have alternative proposal for this for new  and old
high-level consumer API.

Thanks,

Bhavesh

On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jun,
>
> ZK cluster are up and running.  What is best way to programmatically
> recover and I would try to exponential recovery process which I am willing
> to implement.    So do you think monitoring  "ZkClient-EventThread
> <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
>  thread status will be enough to indicate source thread is dead and
> therefore start exponential reconnect process ?
>
> Can you guys at least for new consumer api (0.9.0) provide a call back
> method or notification the consumer is died and reason for it ?
>
>
> Thanks,
> Bhavesh
>
>
>
> On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Is your ZK service alive at that point? If not, you just need to monitor
>> the ZK server properly.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
>> mistry.p.bhavesh@gmail.com>
>> wrote:
>>
>> > Hi Kafka Team,
>> >
>> >
>> > I get following exception due to ZK/Network issues intermittently.  How
>> do
>> > I recover from consumer thread dying *programmatically* and restart
>> source
>> > because we have alerts that due to this error we have partition
>> OWNERSHIP
>> > is *none* ?  Please let me know how to restart source and detect
>> consumer
>> > thread died and need to be restarted ?
>> >
>> >
>> >
>> > 17 Nov 2014 04:29:41,180 ERROR [
>> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
>> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091
>> ]
>> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
>> > ZkEvent[New session event sent to
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
>> > ]
>> > kafka.common.ConsumerRebalanceFailedException:
>> >
>> >
>> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>> > can't rebalance after 8 retries
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> >
>> >
>> >
>> >
>> >
>> > ZK Connection Issues:
>> >
>> > java.net.SocketException: Transport endpoint is not connected
>> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>> >         at
>> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>> >         at
>> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>> >         at
>> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>> >         at
>> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>> >
>> >
>> >
>> >
>> >         at
>> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>> >         at
>> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>> >         at
>> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>> >         at
>> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
>> > KeeperErrorCode = NoNode for
>> >
>> >
>> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>> >         at
>> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>> >         at
>> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>> >         at
>> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>> >         at
>> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >
>>
>
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jun,

ZK cluster are up and running.  What is best way to programmatically
recover and I would try to exponential recovery process which I am willing
to implement.    So do you think monitoring  "ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
 thread status will be enough to indicate source thread is dead and
therefore start exponential reconnect process ?

Can you guys at least for new consumer api (0.9.0) provide a call back
method or notification the consumer is died and reason for it ?


Thanks,
Bhavesh



On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <ju...@gmail.com> wrote:

> Is your ZK service alive at that point? If not, you just need to monitor
> the ZK server properly.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > Hi Kafka Team,
> >
> >
> > I get following exception due to ZK/Network issues intermittently.  How
> do
> > I recover from consumer thread dying *programmatically* and restart
> source
> > because we have alerts that due to this error we have partition OWNERSHIP
> > is *none* ?  Please let me know how to restart source and detect consumer
> > thread died and need to be restarted ?
> >
> >
> >
> > 17 Nov 2014 04:29:41,180 ERROR [
> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> > ZkEvent[New session event sent to
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> > ]
> > kafka.common.ConsumerRebalanceFailedException:
> >
> >
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > can't rebalance after 8 retries
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >
> >
> >
> >
> >
> > ZK Connection Issues:
> >
> > java.net.SocketException: Transport endpoint is not connected
> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> >         at
> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> >         at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> >         at
> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> >         at
> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> >
> >
> >
> >
> >         at
> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> >         at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> >         at
> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> >         at
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > KeeperErrorCode = NoNode for
> >
> >
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> >         at
> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> >         at
> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> >         at
> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> >         at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Jun Rao <ju...@gmail.com>.
Is your ZK service alive at that point? If not, you just need to monitor
the ZK server properly.

Thanks,

Jun

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart source
> because we have alerts that due to this error we have partition OWNERSHIP
> is *none* ?  Please let me know how to restart source and detect consumer
> thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
>
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
>         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>         at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
>         at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>         at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
>
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Jun Rao <ju...@gmail.com>.
Is your ZK service alive at that point? If not, you just need to monitor
the ZK server properly.

Thanks,

Jun

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart source
> because we have alerts that due to this error we have partition OWNERSHIP
> is *none* ?  Please let me know how to restart source and detect consumer
> thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
>
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
>         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>         at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
>         at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>         at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>         at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
>
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Kakfa team,

So just monitor "ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
threads via ThreadInfo[] threads =
ManagementFactory.getThreadMXBean().....; and if this ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>* dies
thread dies, then restart the sources.  Is there any alter approach or life
cycle method  that so api consumer can attached to Consumer life cycle that
it is dying and get notified so we can take some action.

Thanks,

Bhavesh

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart
> source because we have alerts that due to this error we have partition
> OWNERSHIP is *none* ?  Please let me know how to restart source and
> detect consumer thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
>         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>         at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
>         at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>         at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>
>

Re: How to recover from ConsumerRebalanceFailedException ?

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Kakfa team,

So just monitor "ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
threads via ThreadInfo[] threads =
ManagementFactory.getThreadMXBean().....; and if this ZkClient-EventThread
<http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>* dies
thread dies, then restart the sources.  Is there any alter approach or life
cycle method  that so api consumer can attached to Consumer life cycle that
it is dying and get notified so we can take some action.

Thanks,

Bhavesh

On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Kafka Team,
>
>
> I get following exception due to ZK/Network issues intermittently.  How do
> I recover from consumer thread dying *programmatically* and restart
> source because we have alerts that due to this error we have partition
> OWNERSHIP is *none* ?  Please let me know how to restart source and
> detect consumer thread died and need to be restarted ?
>
>
>
> 17 Nov 2014 04:29:41,180 ERROR [
> ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091]
> (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> ZkEvent[New session event sent to
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> ]
> kafka.common.ConsumerRebalanceFailedException:
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> can't rebalance after 8 retries
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
>
>
>
>
> ZK Connection Issues:
>
> java.net.SocketException: Transport endpoint is not connected
>         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>         at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>         at
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>
>
>
>
>         at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>         at
> kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>         at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>         at
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>         at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>
>