You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Bae, Jae Hyeon" <me...@gmail.com> on 2014/03/24 23:19:13 UTC

Reinstating ephemeral nodes and watchers on zk session timeout

Hi

On zookeeper session timeout due to some stopping the world long GC pause
or zookeeper server outage, Ephemeral nodes on kafka broker and consumer
should be recreated but in my test environment, handleNewSession() is not
called.

My test scenario is, starting kafka broker locally and put a breakpoint
somewhere to simulate long pause, and then, I expected handleNewSession()
should be called, but it was not and I saw kafka broker zk registration is
gone.

Previously, to avoid this problem, I overrode zkclient implementation
internally to replace createEphemeral() function call with Apache Curator's
PersistentEphemeralNodes and for reinstating watchers, I implemented
ConnectionStateListener to reinstate all watchers when RECONNECTED happens.

Do you know why I cannot reproduce handleNewSession()?

Thank you
Best, Jae

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by Neha Narkhede <ne...@gmail.com>.
Here are the remaining
issues<https://issues.apache.org/jira/browse/KAFKA/fixforversion/12326539/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-summary-panel>still
in progress. I believe we should be able to do a release mid April.

Thanks,
Neha


On Tue, Mar 25, 2014 at 10:28 AM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Do you have any ETA for 0.8.1.1?
>
>
> On Tue, Mar 25, 2014 at 9:53 AM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > You are probably hitting
> https://issues.apache.org/jira/browse/KAFKA-1317.
> > We are trying to fix it in time for 0.8.1.1.
> >
> > Thanks,
> > Neha
> >
> >
> > On Tue, Mar 25, 2014 at 9:45 AM, Bae, Jae Hyeon <me...@gmail.com>
> > wrote:
> >
> > > ZkEventThread is blocked with the following stack trace:
> > >
> > > "ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000
> > > nid=0x1194a6000 waiting on condition [1194a5000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >   at sun.misc.Unsafe.park(Native Method)
> > >   - parking to wait for  <7c2201800> (a
> > > java.util.concurrent.CountDownLatch$Sync)
> > >   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> > >   at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> > >   at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> > >   at
> > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> > >   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> > >   at
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> > >   at
> > >
> > >
> >
> kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> > >   at kafka.utils.Utils$.inLock(Utils.scala:538)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
> > >  at
> > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> > >   at kafka.utils.Utils$.inLock(Utils.scala:538)
> > >   at
> > >
> > >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> > >   at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >
> > > What did I do wrong in my test environment?
> > >
> > >
> > > On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <me...@gmail.com>
> > > wrote:
> > >
> > > > Nope, linux doesn't work. Let me debug why it's not triggered.
> > > >
> > > >
> > > > On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <metacret@gmail.com
> > > >wrote:
> > > >
> > > >> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
> > > >> package but it didn't work. Let me try in my linux machine.
> > > >>
> > > >>
> > > >> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <
> > neha.narkhede@gmail.com
> > > >wrote:
> > > >>
> > > >>> I think you are trying to introduce a session expiration, then
> could
> > > you
> > > >>> try to do the following and see if you can reproduce the session
> > > >>> expiration?
> > > >>>
> > > >>> ./bin/kafka-server-start.sh config/server.properties
> > > >>> kill -SIGSTOP <kafka_server_pid>
> > > >>> sleep 6s
> > > >>>
> > > >>> At this point, the session will be expired and the node will
> > disappear
> > > >>> from
> > > >>> zookeeper. Then you can do the following -
> > > >>>
> > > >>> kill -SIGCONT <kafka_server_pid>
> > > >>>
> > > >>> At this point, you should see the following log message from inside
> > the
> > > >>> handleNewSession() method -
> > > >>>
> > > >>> INFO re-registering broker info in ZK for broker 0
> > > >>> (kafka.server.KafkaHealthcheck)
> > > >>>
> > > >>> Hope that helps.
> > > >>>
> > > >>> Thanks,
> > > >>> Neha
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <
> metacret@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > Hi
> > > >>> >
> > > >>> > On zookeeper session timeout due to some stopping the world long
> GC
> > > >>> pause
> > > >>> > or zookeeper server outage, Ephemeral nodes on kafka broker and
> > > >>> consumer
> > > >>> > should be recreated but in my test environment,
> handleNewSession()
> > is
> > > >>> not
> > > >>> > called.
> > > >>> >
> > > >>> > My test scenario is, starting kafka broker locally and put a
> > > breakpoint
> > > >>> > somewhere to simulate long pause, and then, I expected
> > > >>> handleNewSession()
> > > >>> > should be called, but it was not and I saw kafka broker zk
> > > >>> registration is
> > > >>> > gone.
> > > >>> >
> > > >>> > Previously, to avoid this problem, I overrode zkclient
> > implementation
> > > >>> > internally to replace createEphemeral() function call with Apache
> > > >>> Curator's
> > > >>> > PersistentEphemeralNodes and for reinstating watchers, I
> > implemented
> > > >>> > ConnectionStateListener to reinstate all watchers when
> RECONNECTED
> > > >>> happens.
> > > >>> >
> > > >>> > Do you know why I cannot reproduce handleNewSession()?
> > > >>> >
> > > >>> > Thank you
> > > >>> > Best, Jae
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Do you have any ETA for 0.8.1.1?


On Tue, Mar 25, 2014 at 9:53 AM, Neha Narkhede <ne...@gmail.com>wrote:

> You are probably hitting https://issues.apache.org/jira/browse/KAFKA-1317.
> We are trying to fix it in time for 0.8.1.1.
>
> Thanks,
> Neha
>
>
> On Tue, Mar 25, 2014 at 9:45 AM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > ZkEventThread is blocked with the following stack trace:
> >
> > "ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000
> > nid=0x1194a6000 waiting on condition [1194a5000]
> >    java.lang.Thread.State: WAITING (parking)
> >   at sun.misc.Unsafe.park(Native Method)
> >   - parking to wait for  <7c2201800> (a
> > java.util.concurrent.CountDownLatch$Sync)
> >   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >   at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> >   at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> >   at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> >   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> >   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> >   at
> >
> >
> kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
> >   at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
> >   at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> >   at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
> >   at kafka.utils.Utils$.inLock(Utils.scala:538)
> >   at
> >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
> >  at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> >   at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> >   at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> >   at kafka.utils.Utils$.inLock(Utils.scala:538)
> >   at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> >   at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> >
> > What did I do wrong in my test environment?
> >
> >
> > On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <me...@gmail.com>
> > wrote:
> >
> > > Nope, linux doesn't work. Let me debug why it's not triggered.
> > >
> > >
> > > On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <metacret@gmail.com
> > >wrote:
> > >
> > >> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
> > >> package but it didn't work. Let me try in my linux machine.
> > >>
> > >>
> > >> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <
> neha.narkhede@gmail.com
> > >wrote:
> > >>
> > >>> I think you are trying to introduce a session expiration, then could
> > you
> > >>> try to do the following and see if you can reproduce the session
> > >>> expiration?
> > >>>
> > >>> ./bin/kafka-server-start.sh config/server.properties
> > >>> kill -SIGSTOP <kafka_server_pid>
> > >>> sleep 6s
> > >>>
> > >>> At this point, the session will be expired and the node will
> disappear
> > >>> from
> > >>> zookeeper. Then you can do the following -
> > >>>
> > >>> kill -SIGCONT <kafka_server_pid>
> > >>>
> > >>> At this point, you should see the following log message from inside
> the
> > >>> handleNewSession() method -
> > >>>
> > >>> INFO re-registering broker info in ZK for broker 0
> > >>> (kafka.server.KafkaHealthcheck)
> > >>>
> > >>> Hope that helps.
> > >>>
> > >>> Thanks,
> > >>> Neha
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi
> > >>> >
> > >>> > On zookeeper session timeout due to some stopping the world long GC
> > >>> pause
> > >>> > or zookeeper server outage, Ephemeral nodes on kafka broker and
> > >>> consumer
> > >>> > should be recreated but in my test environment, handleNewSession()
> is
> > >>> not
> > >>> > called.
> > >>> >
> > >>> > My test scenario is, starting kafka broker locally and put a
> > breakpoint
> > >>> > somewhere to simulate long pause, and then, I expected
> > >>> handleNewSession()
> > >>> > should be called, but it was not and I saw kafka broker zk
> > >>> registration is
> > >>> > gone.
> > >>> >
> > >>> > Previously, to avoid this problem, I overrode zkclient
> implementation
> > >>> > internally to replace createEphemeral() function call with Apache
> > >>> Curator's
> > >>> > PersistentEphemeralNodes and for reinstating watchers, I
> implemented
> > >>> > ConnectionStateListener to reinstate all watchers when RECONNECTED
> > >>> happens.
> > >>> >
> > >>> > Do you know why I cannot reproduce handleNewSession()?
> > >>> >
> > >>> > Thank you
> > >>> > Best, Jae
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by Neha Narkhede <ne...@gmail.com>.
You are probably hitting https://issues.apache.org/jira/browse/KAFKA-1317.
We are trying to fix it in time for 0.8.1.1.

Thanks,
Neha


On Tue, Mar 25, 2014 at 9:45 AM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> ZkEventThread is blocked with the following stack trace:
>
> "ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000
> nid=0x1194a6000 waiting on condition [1194a5000]
>    java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <7c2201800> (a
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>   at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>   at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>   at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>   at
>
> kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
>   at
>
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
>   at
>
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
>   at
>
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
>   at kafka.utils.Utils$.inLock(Utils.scala:538)
>   at
>
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
>  at
>
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
>   at
>
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
>   at
>
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
>   at kafka.utils.Utils$.inLock(Utils.scala:538)
>   at
>
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
>   at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
> What did I do wrong in my test environment?
>
>
> On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > Nope, linux doesn't work. Let me debug why it's not triggered.
> >
> >
> > On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <metacret@gmail.com
> >wrote:
> >
> >> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
> >> package but it didn't work. Let me try in my linux machine.
> >>
> >>
> >> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
> >>
> >>> I think you are trying to introduce a session expiration, then could
> you
> >>> try to do the following and see if you can reproduce the session
> >>> expiration?
> >>>
> >>> ./bin/kafka-server-start.sh config/server.properties
> >>> kill -SIGSTOP <kafka_server_pid>
> >>> sleep 6s
> >>>
> >>> At this point, the session will be expired and the node will disappear
> >>> from
> >>> zookeeper. Then you can do the following -
> >>>
> >>> kill -SIGCONT <kafka_server_pid>
> >>>
> >>> At this point, you should see the following log message from inside the
> >>> handleNewSession() method -
> >>>
> >>> INFO re-registering broker info in ZK for broker 0
> >>> (kafka.server.KafkaHealthcheck)
> >>>
> >>> Hope that helps.
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>>
> >>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi
> >>> >
> >>> > On zookeeper session timeout due to some stopping the world long GC
> >>> pause
> >>> > or zookeeper server outage, Ephemeral nodes on kafka broker and
> >>> consumer
> >>> > should be recreated but in my test environment, handleNewSession() is
> >>> not
> >>> > called.
> >>> >
> >>> > My test scenario is, starting kafka broker locally and put a
> breakpoint
> >>> > somewhere to simulate long pause, and then, I expected
> >>> handleNewSession()
> >>> > should be called, but it was not and I saw kafka broker zk
> >>> registration is
> >>> > gone.
> >>> >
> >>> > Previously, to avoid this problem, I overrode zkclient implementation
> >>> > internally to replace createEphemeral() function call with Apache
> >>> Curator's
> >>> > PersistentEphemeralNodes and for reinstating watchers, I implemented
> >>> > ConnectionStateListener to reinstate all watchers when RECONNECTED
> >>> happens.
> >>> >
> >>> > Do you know why I cannot reproduce handleNewSession()?
> >>> >
> >>> > Thank you
> >>> > Best, Jae
> >>> >
> >>>
> >>
> >>
> >
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
ZkEventThread is blocked with the following stack trace:

"ZkClient-EventThread-18-localhost:2181" daemon prio=5 tid=7fb31b95c000
nid=0x1194a6000 waiting on condition [1194a5000]
   java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <7c2201800> (a
java.util.concurrent.CountDownLatch$Sync)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
  at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
  at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
  at
kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
  at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
 at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
  at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
  at
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
  at kafka.utils.Utils$.inLock(Utils.scala:538)
  at
kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
  at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
  at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

What did I do wrong in my test environment?


On Tue, Mar 25, 2014 at 9:24 AM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Nope, linux doesn't work. Let me debug why it's not triggered.
>
>
> On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <me...@gmail.com>wrote:
>
>> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
>> package but it didn't work. Let me try in my linux machine.
>>
>>
>> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <ne...@gmail.com>wrote:
>>
>>> I think you are trying to introduce a session expiration, then could you
>>> try to do the following and see if you can reproduce the session
>>> expiration?
>>>
>>> ./bin/kafka-server-start.sh config/server.properties
>>> kill -SIGSTOP <kafka_server_pid>
>>> sleep 6s
>>>
>>> At this point, the session will be expired and the node will disappear
>>> from
>>> zookeeper. Then you can do the following -
>>>
>>> kill -SIGCONT <kafka_server_pid>
>>>
>>> At this point, you should see the following log message from inside the
>>> handleNewSession() method -
>>>
>>> INFO re-registering broker info in ZK for broker 0
>>> (kafka.server.KafkaHealthcheck)
>>>
>>> Hope that helps.
>>>
>>> Thanks,
>>> Neha
>>>
>>>
>>>
>>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com>
>>> wrote:
>>>
>>> > Hi
>>> >
>>> > On zookeeper session timeout due to some stopping the world long GC
>>> pause
>>> > or zookeeper server outage, Ephemeral nodes on kafka broker and
>>> consumer
>>> > should be recreated but in my test environment, handleNewSession() is
>>> not
>>> > called.
>>> >
>>> > My test scenario is, starting kafka broker locally and put a breakpoint
>>> > somewhere to simulate long pause, and then, I expected
>>> handleNewSession()
>>> > should be called, but it was not and I saw kafka broker zk
>>> registration is
>>> > gone.
>>> >
>>> > Previously, to avoid this problem, I overrode zkclient implementation
>>> > internally to replace createEphemeral() function call with Apache
>>> Curator's
>>> > PersistentEphemeralNodes and for reinstating watchers, I implemented
>>> > ConnectionStateListener to reinstate all watchers when RECONNECTED
>>> happens.
>>> >
>>> > Do you know why I cannot reproduce handleNewSession()?
>>> >
>>> > Thank you
>>> > Best, Jae
>>> >
>>>
>>
>>
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Nope, linux doesn't work. Let me debug why it's not triggered.


On Tue, Mar 25, 2014 at 9:21 AM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
> package but it didn't work. Let me try in my linux machine.
>
>
> On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <ne...@gmail.com>wrote:
>
>> I think you are trying to introduce a session expiration, then could you
>> try to do the following and see if you can reproduce the session
>> expiration?
>>
>> ./bin/kafka-server-start.sh config/server.properties
>> kill -SIGSTOP <kafka_server_pid>
>> sleep 6s
>>
>> At this point, the session will be expired and the node will disappear
>> from
>> zookeeper. Then you can do the following -
>>
>> kill -SIGCONT <kafka_server_pid>
>>
>> At this point, you should see the following log message from inside the
>> handleNewSession() method -
>>
>> INFO re-registering broker info in ZK for broker 0
>> (kafka.server.KafkaHealthcheck)
>>
>> Hope that helps.
>>
>> Thanks,
>> Neha
>>
>>
>>
>> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com>
>> wrote:
>>
>> > Hi
>> >
>> > On zookeeper session timeout due to some stopping the world long GC
>> pause
>> > or zookeeper server outage, Ephemeral nodes on kafka broker and consumer
>> > should be recreated but in my test environment, handleNewSession() is
>> not
>> > called.
>> >
>> > My test scenario is, starting kafka broker locally and put a breakpoint
>> > somewhere to simulate long pause, and then, I expected
>> handleNewSession()
>> > should be called, but it was not and I saw kafka broker zk registration
>> is
>> > gone.
>> >
>> > Previously, to avoid this problem, I overrode zkclient implementation
>> > internally to replace createEphemeral() function call with Apache
>> Curator's
>> > PersistentEphemeralNodes and for reinstating watchers, I implemented
>> > ConnectionStateListener to reinstate all watchers when RECONNECTED
>> happens.
>> >
>> > Do you know why I cannot reproduce handleNewSession()?
>> >
>> > Thank you
>> > Best, Jae
>> >
>>
>
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by "Bae, Jae Hyeon" <me...@gmail.com>.
Hm... I cannot reproduce in my local, I downloaded kafka_2.8.0-0.8.1
package but it didn't work. Let me try in my linux machine.


On Mon, Mar 24, 2014 at 6:11 PM, Neha Narkhede <ne...@gmail.com>wrote:

> I think you are trying to introduce a session expiration, then could you
> try to do the following and see if you can reproduce the session
> expiration?
>
> ./bin/kafka-server-start.sh config/server.properties
> kill -SIGSTOP <kafka_server_pid>
> sleep 6s
>
> At this point, the session will be expired and the node will disappear from
> zookeeper. Then you can do the following -
>
> kill -SIGCONT <kafka_server_pid>
>
> At this point, you should see the following log message from inside the
> handleNewSession() method -
>
> INFO re-registering broker info in ZK for broker 0
> (kafka.server.KafkaHealthcheck)
>
> Hope that helps.
>
> Thanks,
> Neha
>
>
>
> On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com>
> wrote:
>
> > Hi
> >
> > On zookeeper session timeout due to some stopping the world long GC pause
> > or zookeeper server outage, Ephemeral nodes on kafka broker and consumer
> > should be recreated but in my test environment, handleNewSession() is not
> > called.
> >
> > My test scenario is, starting kafka broker locally and put a breakpoint
> > somewhere to simulate long pause, and then, I expected handleNewSession()
> > should be called, but it was not and I saw kafka broker zk registration
> is
> > gone.
> >
> > Previously, to avoid this problem, I overrode zkclient implementation
> > internally to replace createEphemeral() function call with Apache
> Curator's
> > PersistentEphemeralNodes and for reinstating watchers, I implemented
> > ConnectionStateListener to reinstate all watchers when RECONNECTED
> happens.
> >
> > Do you know why I cannot reproduce handleNewSession()?
> >
> > Thank you
> > Best, Jae
> >
>

Re: Reinstating ephemeral nodes and watchers on zk session timeout

Posted by Neha Narkhede <ne...@gmail.com>.
I think you are trying to introduce a session expiration, then could you
try to do the following and see if you can reproduce the session expiration?

./bin/kafka-server-start.sh config/server.properties
kill -SIGSTOP <kafka_server_pid>
sleep 6s

At this point, the session will be expired and the node will disappear from
zookeeper. Then you can do the following -

kill -SIGCONT <kafka_server_pid>

At this point, you should see the following log message from inside the
handleNewSession() method -

INFO re-registering broker info in ZK for broker 0
(kafka.server.KafkaHealthcheck)

Hope that helps.

Thanks,
Neha



On Mon, Mar 24, 2014 at 3:19 PM, Bae, Jae Hyeon <me...@gmail.com> wrote:

> Hi
>
> On zookeeper session timeout due to some stopping the world long GC pause
> or zookeeper server outage, Ephemeral nodes on kafka broker and consumer
> should be recreated but in my test environment, handleNewSession() is not
> called.
>
> My test scenario is, starting kafka broker locally and put a breakpoint
> somewhere to simulate long pause, and then, I expected handleNewSession()
> should be called, but it was not and I saw kafka broker zk registration is
> gone.
>
> Previously, to avoid this problem, I overrode zkclient implementation
> internally to replace createEphemeral() function call with Apache Curator's
> PersistentEphemeralNodes and for reinstating watchers, I implemented
> ConnectionStateListener to reinstate all watchers when RECONNECTED happens.
>
> Do you know why I cannot reproduce handleNewSession()?
>
> Thank you
> Best, Jae
>