You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@zookeeper.apache.org by Matt Wise <ma...@nextdoor.com> on 2012/12/09 22:22:23 UTC

Kazoo 'state listener' issue...

I've got a weird connection issue playing around with Kazoo... If I do something simple like:

>  k = KazooClient()
>  k.start()
>  k.create('/foo')
>  k.stop()
>  k.start()
>  k.create('/foo')

it works fine... the node is re-created, all is happy.

However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:

> from kazoo.client import KazooClient
> from kazoo.client import KazooState
> import logging
> log = logging.getLogger()
> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
> log.setLevel(logging.DEBUG)
> formatter = logging.Formatter(format)
> handler = logging.StreamHandler()
> handler.setFormatter(formatter)
> log.addHandler(handler)
> 
> registered_nodes = {}
> 
> def register_node(node, data=None):
>     if node in registered_nodes:
>         if data == registered_nodes[node]:
>             log.debug('Already registered [%s] in data provider.' % node)
>             return
>     log.debug('Registering [%s] in data provider.' % node)
>     _zk.create(node, ephemeral=True, makepath=True)
>     registered_nodes[node] = data
> 
> def _re_register_nodes(nodes):
>     for node in nodes.iteritems():
>         register_node(node[0], data=node[1])
> 
> def _state_listener(state):
>     log.warning('Zookeeper connection state changed: %s' % state)
>     if state == KazooState.SUSPENDED:
>         CONNECTION_STATE=False
>     elif state == KazooState.LOST:
>         CONNECTION_STATE=False
>     else:
>         CONNECTION_STATE=True
>         try:
>             nodes = registered_nodes
>         except:
>             # no local nodes
>         registered_nodes = {}
>         _re_register_nodes(nodes)
> 
> log.setLevel(logging.DEBUG) 
> 
> registered_nodes = {}
> 
> _zk = KazooClient(hosts='localhost:2182')
> _zk.start()
> _zk.add_listener(_state_listener)
> _state_listener(_zk.state)
> 
> # now register a node
> register_node('/abc/a')
> _zk.stop()
> _zk.start()


If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.

Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().

--Matt

Re: Kazoo 'state listener' issue...

Posted by Alan Cabrera <li...@toolazydogs.com>.
On Dec 21, 2012, at 10:00 PM, Matt Wise wrote:

> I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.

Ahh, calling user callbacks while holding a lock.  That would do it.  :)

> To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use:
> 
>>            self._zk.handler.spawn(self._re_establish_registrations)
> 
> This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down.

Yeah, Ben is awesome.


Regards,
Alan

> On Dec 10, 2012, at 11:18 AM, Alan Cabrera <li...@toolazydogs.com> wrote:
> 
>> Without really digging into this I'll toss in my initial observation.
>> 
>> Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
>> 
>> 
>> Regards,
>> Alan
>> 
>> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
>> 
>>> Just to clarify, if you go and change test() into:
>>> 
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self.register_node('/abc/a')
>>>>      self._zk.get_children('/abc')
>>>> 
>>> 
>>> and then remove these lines from the state_handler() method:
>>>> 
>>>>>         for node in nodes.iteritems():
>>>>>             self.register_node(node[0], data=node[1])
>>>> 
>>> 
>>> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>>> 
>>> On Dec 9, 2012, at 2:26 PM, Matt Wise <ma...@nextdoor.com> wrote:
>>> 
>>>> Hrmm here's a cleaner way to reproduce the issue:
>>>> 
>>>> test.py:
>>>>> from kazoo.client import KazooClient
>>>>> from kazoo.client import KazooState
>>>>> from kazoo.handlers.threading import TimeoutError
>>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>>> import logging
>>>>> 
>>>>> 
>>>>> class Test(object):
>>>>> def __init__(self):
>>>>>     self.log = logging.getLogger()
>>>>>     format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>>     self.log.setLevel(logging.INFO)
>>>>>     formatter = logging.Formatter(format)
>>>>>     handler = logging.StreamHandler()
>>>>>     handler.setFormatter(formatter)
>>>>>     self.log.addHandler(handler)
>>>>> 
>>>>>     self.registered_nodes = {}
>>>>> 
>>>>>     self.log.setLevel(logging.DEBUG)
>>>>> 
>>>>>     self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>>>     self._zk.start()
>>>>>     self._zk.add_listener(self._state_listener)
>>>>>     self._state_listener(self._zk.state)
>>>>> 
>>>>> def test(self):
>>>>>     # now register a node
>>>>>     self.register_node('/abc/a')
>>>>>     self._zk.stop()
>>>>>     self._zk.start()
>>>>>     self._zk.get_children('/abc')
>>>>> 
>>>>> def register_node(self, node, data=None):
>>>>>     if node in self.registered_nodes:
>>>>>         if data == self.registered_nodes[node]:
>>>>>             self.log.debug('Already registered [%s] in data provider.' % node)
>>>>>             return
>>>>>     self.log.debug('Registering [%s] in data provider.' % node)
>>>>>     self._zk.create(node, ephemeral=True, makepath=True)
>>>>>     self.registered_nodes[node] = data
>>>>> 
>>>>> 
>>>>> def _state_listener(self,state):
>>>>>     self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>>     if state == KazooState.SUSPENDED:
>>>>>         self.CONNECTION_STATE=False
>>>>>     elif state == KazooState.LOST:
>>>>>         self.CONNECTION_STATE=False
>>>>>     else:
>>>>>         self.CONNECTION_STATE=True
>>>>>         nodes = {}
>>>>>         print self.registered_nodes
>>>>>         try:
>>>>>             nodes = self.registered_nodes
>>>>>         except:
>>>>>             pass
>>>>>         self.registered_nodes = {}
>>>>>         for node in nodes.iteritems():
>>>>>             self.register_node(node[0], data=node[1])
>>>> 
>>>> python
>>>>>>> import test
>>>>>>> k = test.Test()
>>>>>>> k.test()
>>>> 
>>>> (watch it hang ... )
>>>> 
>>>> 
>>>> On Dec 9, 2012, at 1:22 PM, Matt Wise <ma...@nextdoor.com> wrote:
>>>> 
>>>>> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
>>>>> 
>>>>>> k = KazooClient()
>>>>>> k.start()
>>>>>> k.create('/foo')
>>>>>> k.stop()
>>>>>> k.start()
>>>>>> k.create('/foo')
>>>>> 
>>>>> it works fine... the node is re-created, all is happy.
>>>>> 
>>>>> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
>>>>> 
>>>>>> from kazoo.client import KazooClient
>>>>>> from kazoo.client import KazooState
>>>>>> import logging
>>>>>> log = logging.getLogger()
>>>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>>> log.setLevel(logging.DEBUG)
>>>>>> formatter = logging.Formatter(format)
>>>>>> handler = logging.StreamHandler()
>>>>>> handler.setFormatter(formatter)
>>>>>> log.addHandler(handler)
>>>>>> 
>>>>>> registered_nodes = {}
>>>>>> 
>>>>>> def register_node(node, data=None):
>>>>>> if node in registered_nodes:
>>>>>>    if data == registered_nodes[node]:
>>>>>>        log.debug('Already registered [%s] in data provider.' % node)
>>>>>>        return
>>>>>> log.debug('Registering [%s] in data provider.' % node)
>>>>>> _zk.create(node, ephemeral=True, makepath=True)
>>>>>> registered_nodes[node] = data
>>>>>> 
>>>>>> def _re_register_nodes(nodes):
>>>>>> for node in nodes.iteritems():
>>>>>>    register_node(node[0], data=node[1])
>>>>>> 
>>>>>> def _state_listener(state):
>>>>>> log.warning('Zookeeper connection state changed: %s' % state)
>>>>>> if state == KazooState.SUSPENDED:
>>>>>>    CONNECTION_STATE=False
>>>>>> elif state == KazooState.LOST:
>>>>>>    CONNECTION_STATE=False
>>>>>> else:
>>>>>>    CONNECTION_STATE=True
>>>>>>    try:
>>>>>>        nodes = registered_nodes
>>>>>>    except:
>>>>>>        # no local nodes
>>>>>>    registered_nodes = {}
>>>>>>    _re_register_nodes(nodes)
>>>>>> 
>>>>>> log.setLevel(logging.DEBUG) 
>>>>>> 
>>>>>> registered_nodes = {}
>>>>>> 
>>>>>> _zk = KazooClient(hosts='localhost:2182')
>>>>>> _zk.start()
>>>>>> _zk.add_listener(_state_listener)
>>>>>> _state_listener(_zk.state)
>>>>>> 
>>>>>> # now register a node
>>>>>> register_node('/abc/a')
>>>>>> _zk.stop()
>>>>>> _zk.start()
>>>>> 
>>>>> 
>>>>> If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.
>>>>> 
>>>>> Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().
>>>>> 
>>>>> --Matt
>>>> 
>>> 
>> 
> 


Re: Kazoo 'state listener' issue...

Posted by Matt Wise <ma...@nextdoor.com>.
I just want to circle around on this issue.. The root cause was that the Kazoo thread triggering the 'state listener' callback does so with a lock that it does not release until after the callbacks are finished. However, if your callback tries to use this thread to make a Zookeeper call (ie, get, set, create, delete), it waits on that lock and immediately causes a deadlock.

To handle this scenario they have a 'spawn()' function in the Zookeeper thread handler that you can use:

>             self._zk.handler.spawn(self._re_establish_registrations)

This spawns a thread and immediately returns, allowing the Zookeeper thread to finish its callbacks and release the lock. Thanks a ton to Ben B. for helping me track that down.

--Matt

On Dec 10, 2012, at 11:18 AM, Alan Cabrera <li...@toolazydogs.com> wrote:

> Without really digging into this I'll toss in my initial observation.
> 
> Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.
> 
> 
> Regards,
> Alan
> 
> On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:
> 
>> Just to clarify, if you go and change test() into:
>> 
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self.register_node('/abc/a')
>>>       self._zk.get_children('/abc')
>>> 
>> 
>> and then remove these lines from the state_handler() method:
>>> 
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>> 
>> 
>> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
>> 
>> On Dec 9, 2012, at 2:26 PM, Matt Wise <ma...@nextdoor.com> wrote:
>> 
>>> Hrmm here's a cleaner way to reproduce the issue:
>>> 
>>> test.py:
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> from kazoo.handlers.threading import TimeoutError
>>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>>> import logging
>>>> 
>>>> 
>>>> class Test(object):
>>>>  def __init__(self):
>>>>      self.log = logging.getLogger()
>>>>      format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>      self.log.setLevel(logging.INFO)
>>>>      formatter = logging.Formatter(format)
>>>>      handler = logging.StreamHandler()
>>>>      handler.setFormatter(formatter)
>>>>      self.log.addHandler(handler)
>>>> 
>>>>      self.registered_nodes = {}
>>>> 
>>>>      self.log.setLevel(logging.DEBUG)
>>>> 
>>>>      self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>>      self._zk.start()
>>>>      self._zk.add_listener(self._state_listener)
>>>>      self._state_listener(self._zk.state)
>>>> 
>>>>  def test(self):
>>>>      # now register a node
>>>>      self.register_node('/abc/a')
>>>>      self._zk.stop()
>>>>      self._zk.start()
>>>>      self._zk.get_children('/abc')
>>>> 
>>>>  def register_node(self, node, data=None):
>>>>      if node in self.registered_nodes:
>>>>          if data == self.registered_nodes[node]:
>>>>              self.log.debug('Already registered [%s] in data provider.' % node)
>>>>              return
>>>>      self.log.debug('Registering [%s] in data provider.' % node)
>>>>      self._zk.create(node, ephemeral=True, makepath=True)
>>>>      self.registered_nodes[node] = data
>>>> 
>>>> 
>>>>  def _state_listener(self,state):
>>>>      self.log.warning('Zookeeper connection state changed: %s' % state)
>>>>      if state == KazooState.SUSPENDED:
>>>>          self.CONNECTION_STATE=False
>>>>      elif state == KazooState.LOST:
>>>>          self.CONNECTION_STATE=False
>>>>      else:
>>>>          self.CONNECTION_STATE=True
>>>>          nodes = {}
>>>>          print self.registered_nodes
>>>>          try:
>>>>              nodes = self.registered_nodes
>>>>          except:
>>>>              pass
>>>>          self.registered_nodes = {}
>>>>          for node in nodes.iteritems():
>>>>              self.register_node(node[0], data=node[1])
>>> 
>>> python
>>>>>> import test
>>>>>> k = test.Test()
>>>>>> k.test()
>>> 
>>> (watch it hang ... )
>>> 
>>> 
>>> On Dec 9, 2012, at 1:22 PM, Matt Wise <ma...@nextdoor.com> wrote:
>>> 
>>>> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
>>>> 
>>>>> k = KazooClient()
>>>>> k.start()
>>>>> k.create('/foo')
>>>>> k.stop()
>>>>> k.start()
>>>>> k.create('/foo')
>>>> 
>>>> it works fine... the node is re-created, all is happy.
>>>> 
>>>> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
>>>> 
>>>>> from kazoo.client import KazooClient
>>>>> from kazoo.client import KazooState
>>>>> import logging
>>>>> log = logging.getLogger()
>>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>>> log.setLevel(logging.DEBUG)
>>>>> formatter = logging.Formatter(format)
>>>>> handler = logging.StreamHandler()
>>>>> handler.setFormatter(formatter)
>>>>> log.addHandler(handler)
>>>>> 
>>>>> registered_nodes = {}
>>>>> 
>>>>> def register_node(node, data=None):
>>>>> if node in registered_nodes:
>>>>>     if data == registered_nodes[node]:
>>>>>         log.debug('Already registered [%s] in data provider.' % node)
>>>>>         return
>>>>> log.debug('Registering [%s] in data provider.' % node)
>>>>> _zk.create(node, ephemeral=True, makepath=True)
>>>>> registered_nodes[node] = data
>>>>> 
>>>>> def _re_register_nodes(nodes):
>>>>> for node in nodes.iteritems():
>>>>>     register_node(node[0], data=node[1])
>>>>> 
>>>>> def _state_listener(state):
>>>>> log.warning('Zookeeper connection state changed: %s' % state)
>>>>> if state == KazooState.SUSPENDED:
>>>>>     CONNECTION_STATE=False
>>>>> elif state == KazooState.LOST:
>>>>>     CONNECTION_STATE=False
>>>>> else:
>>>>>     CONNECTION_STATE=True
>>>>>     try:
>>>>>         nodes = registered_nodes
>>>>>     except:
>>>>>         # no local nodes
>>>>>     registered_nodes = {}
>>>>>     _re_register_nodes(nodes)
>>>>> 
>>>>> log.setLevel(logging.DEBUG) 
>>>>> 
>>>>> registered_nodes = {}
>>>>> 
>>>>> _zk = KazooClient(hosts='localhost:2182')
>>>>> _zk.start()
>>>>> _zk.add_listener(_state_listener)
>>>>> _state_listener(_zk.state)
>>>>> 
>>>>> # now register a node
>>>>> register_node('/abc/a')
>>>>> _zk.stop()
>>>>> _zk.start()
>>>> 
>>>> 
>>>> If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.
>>>> 
>>>> Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().
>>>> 
>>>> --Matt
>>> 
>> 
> 


Re: Kazoo 'state listener' issue...

Posted by Alan Cabrera <li...@toolazydogs.com>.
Without really digging into this I'll toss in my initial observation.

Calling zk while still being inside a zk callback seems a bit dangerous.  I would have a queue and event thread and have work from the callbacks feed this queue which would be executed inside the event thread.


Regards,
Alan

On Dec 9, 2012, at 2:30 PM, Matt Wise wrote:

> Just to clarify, if you go and change test() into:
> 
>>    def test(self):
>>        # now register a node
>>        self.register_node('/abc/a')
>>        self._zk.stop()
>>        self._zk.start()
>>        self.register_node('/abc/a')
>>        self._zk.get_children('/abc')
>> 
> 
> and then remove these lines from the state_handler() method:
>> 
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>> 
> 
> then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?
> 
> On Dec 9, 2012, at 2:26 PM, Matt Wise <ma...@nextdoor.com> wrote:
> 
>> Hrmm here's a cleaner way to reproduce the issue:
>> 
>> test.py:
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> from kazoo.handlers.threading import TimeoutError
>>> from kazoo.handlers.gevent import SequentialGeventHandler
>>> import logging
>>> 
>>> 
>>> class Test(object):
>>>   def __init__(self):
>>>       self.log = logging.getLogger()
>>>       format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>       self.log.setLevel(logging.INFO)
>>>       formatter = logging.Formatter(format)
>>>       handler = logging.StreamHandler()
>>>       handler.setFormatter(formatter)
>>>       self.log.addHandler(handler)
>>> 
>>>       self.registered_nodes = {}
>>> 
>>>       self.log.setLevel(logging.DEBUG)
>>> 
>>>       self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>>       self._zk.start()
>>>       self._zk.add_listener(self._state_listener)
>>>       self._state_listener(self._zk.state)
>>> 
>>>   def test(self):
>>>       # now register a node
>>>       self.register_node('/abc/a')
>>>       self._zk.stop()
>>>       self._zk.start()
>>>       self._zk.get_children('/abc')
>>> 
>>>   def register_node(self, node, data=None):
>>>       if node in self.registered_nodes:
>>>           if data == self.registered_nodes[node]:
>>>               self.log.debug('Already registered [%s] in data provider.' % node)
>>>               return
>>>       self.log.debug('Registering [%s] in data provider.' % node)
>>>       self._zk.create(node, ephemeral=True, makepath=True)
>>>       self.registered_nodes[node] = data
>>> 
>>> 
>>>   def _state_listener(self,state):
>>>       self.log.warning('Zookeeper connection state changed: %s' % state)
>>>       if state == KazooState.SUSPENDED:
>>>           self.CONNECTION_STATE=False
>>>       elif state == KazooState.LOST:
>>>           self.CONNECTION_STATE=False
>>>       else:
>>>           self.CONNECTION_STATE=True
>>>           nodes = {}
>>>           print self.registered_nodes
>>>           try:
>>>               nodes = self.registered_nodes
>>>           except:
>>>               pass
>>>           self.registered_nodes = {}
>>>           for node in nodes.iteritems():
>>>               self.register_node(node[0], data=node[1])
>> 
>> python
>>>>> import test
>>>>> k = test.Test()
>>>>> k.test()
>> 
>> (watch it hang ... )
>> 
>> 
>> On Dec 9, 2012, at 1:22 PM, Matt Wise <ma...@nextdoor.com> wrote:
>> 
>>> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
>>> 
>>>> k = KazooClient()
>>>> k.start()
>>>> k.create('/foo')
>>>> k.stop()
>>>> k.start()
>>>> k.create('/foo')
>>> 
>>> it works fine... the node is re-created, all is happy.
>>> 
>>> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
>>> 
>>>> from kazoo.client import KazooClient
>>>> from kazoo.client import KazooState
>>>> import logging
>>>> log = logging.getLogger()
>>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>>> log.setLevel(logging.DEBUG)
>>>> formatter = logging.Formatter(format)
>>>> handler = logging.StreamHandler()
>>>> handler.setFormatter(formatter)
>>>> log.addHandler(handler)
>>>> 
>>>> registered_nodes = {}
>>>> 
>>>> def register_node(node, data=None):
>>>>  if node in registered_nodes:
>>>>      if data == registered_nodes[node]:
>>>>          log.debug('Already registered [%s] in data provider.' % node)
>>>>          return
>>>>  log.debug('Registering [%s] in data provider.' % node)
>>>>  _zk.create(node, ephemeral=True, makepath=True)
>>>>  registered_nodes[node] = data
>>>> 
>>>> def _re_register_nodes(nodes):
>>>>  for node in nodes.iteritems():
>>>>      register_node(node[0], data=node[1])
>>>> 
>>>> def _state_listener(state):
>>>>  log.warning('Zookeeper connection state changed: %s' % state)
>>>>  if state == KazooState.SUSPENDED:
>>>>      CONNECTION_STATE=False
>>>>  elif state == KazooState.LOST:
>>>>      CONNECTION_STATE=False
>>>>  else:
>>>>      CONNECTION_STATE=True
>>>>      try:
>>>>          nodes = registered_nodes
>>>>      except:
>>>>          # no local nodes
>>>>      registered_nodes = {}
>>>>      _re_register_nodes(nodes)
>>>> 
>>>> log.setLevel(logging.DEBUG) 
>>>> 
>>>> registered_nodes = {}
>>>> 
>>>> _zk = KazooClient(hosts='localhost:2182')
>>>> _zk.start()
>>>> _zk.add_listener(_state_listener)
>>>> _state_listener(_zk.state)
>>>> 
>>>> # now register a node
>>>> register_node('/abc/a')
>>>> _zk.stop()
>>>> _zk.start()
>>> 
>>> 
>>> If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.
>>> 
>>> Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().
>>> 
>>> --Matt
>> 
> 


Re: Kazoo 'state listener' issue...

Posted by Matt Wise <ma...@nextdoor.com>.
Just to clarify, if you go and change test() into:

>     def test(self):
>         # now register a node
>         self.register_node('/abc/a')
>         self._zk.stop()
>         self._zk.start()
>         self.register_node('/abc/a')
>         self._zk.get_children('/abc')
> 

and then remove these lines from the state_handler() method:
> 
>>            for node in nodes.iteritems():
>>                self.register_node(node[0], data=node[1])
> 

then it works perfectly.. no hang, nothing. it seems that the register_node cannot be called from within the state handler class. Why?

On Dec 9, 2012, at 2:26 PM, Matt Wise <ma...@nextdoor.com> wrote:

> Hrmm here's a cleaner way to reproduce the issue:
> 
> test.py:
>> from kazoo.client import KazooClient
>> from kazoo.client import KazooState
>> from kazoo.handlers.threading import TimeoutError
>> from kazoo.handlers.gevent import SequentialGeventHandler
>> import logging
>> 
>> 
>> class Test(object):
>>    def __init__(self):
>>        self.log = logging.getLogger()
>>        format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>        self.log.setLevel(logging.INFO)
>>        formatter = logging.Formatter(format)
>>        handler = logging.StreamHandler()
>>        handler.setFormatter(formatter)
>>        self.log.addHandler(handler)
>> 
>>        self.registered_nodes = {}
>> 
>>        self.log.setLevel(logging.DEBUG)
>> 
>>        self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>>        self._zk.start()
>>        self._zk.add_listener(self._state_listener)
>>        self._state_listener(self._zk.state)
>> 
>>    def test(self):
>>        # now register a node
>>        self.register_node('/abc/a')
>>        self._zk.stop()
>>        self._zk.start()
>>        self._zk.get_children('/abc')
>> 
>>    def register_node(self, node, data=None):
>>        if node in self.registered_nodes:
>>            if data == self.registered_nodes[node]:
>>                self.log.debug('Already registered [%s] in data provider.' % node)
>>                return
>>        self.log.debug('Registering [%s] in data provider.' % node)
>>        self._zk.create(node, ephemeral=True, makepath=True)
>>        self.registered_nodes[node] = data
>> 
>> 
>>    def _state_listener(self,state):
>>        self.log.warning('Zookeeper connection state changed: %s' % state)
>>        if state == KazooState.SUSPENDED:
>>            self.CONNECTION_STATE=False
>>        elif state == KazooState.LOST:
>>            self.CONNECTION_STATE=False
>>        else:
>>            self.CONNECTION_STATE=True
>>            nodes = {}
>>            print self.registered_nodes
>>            try:
>>                nodes = self.registered_nodes
>>            except:
>>                pass
>>            self.registered_nodes = {}
>>            for node in nodes.iteritems():
>>                self.register_node(node[0], data=node[1])
> 
> python
>>>> import test
>>>> k = test.Test()
>>>> k.test()
> 
> (watch it hang ... )
> 
> 
> On Dec 9, 2012, at 1:22 PM, Matt Wise <ma...@nextdoor.com> wrote:
> 
>> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
>> 
>>> k = KazooClient()
>>> k.start()
>>> k.create('/foo')
>>> k.stop()
>>> k.start()
>>> k.create('/foo')
>> 
>> it works fine... the node is re-created, all is happy.
>> 
>> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
>> 
>>> from kazoo.client import KazooClient
>>> from kazoo.client import KazooState
>>> import logging
>>> log = logging.getLogger()
>>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>>> log.setLevel(logging.DEBUG)
>>> formatter = logging.Formatter(format)
>>> handler = logging.StreamHandler()
>>> handler.setFormatter(formatter)
>>> log.addHandler(handler)
>>> 
>>> registered_nodes = {}
>>> 
>>> def register_node(node, data=None):
>>>   if node in registered_nodes:
>>>       if data == registered_nodes[node]:
>>>           log.debug('Already registered [%s] in data provider.' % node)
>>>           return
>>>   log.debug('Registering [%s] in data provider.' % node)
>>>   _zk.create(node, ephemeral=True, makepath=True)
>>>   registered_nodes[node] = data
>>> 
>>> def _re_register_nodes(nodes):
>>>   for node in nodes.iteritems():
>>>       register_node(node[0], data=node[1])
>>> 
>>> def _state_listener(state):
>>>   log.warning('Zookeeper connection state changed: %s' % state)
>>>   if state == KazooState.SUSPENDED:
>>>       CONNECTION_STATE=False
>>>   elif state == KazooState.LOST:
>>>       CONNECTION_STATE=False
>>>   else:
>>>       CONNECTION_STATE=True
>>>       try:
>>>           nodes = registered_nodes
>>>       except:
>>>           # no local nodes
>>>       registered_nodes = {}
>>>       _re_register_nodes(nodes)
>>> 
>>> log.setLevel(logging.DEBUG) 
>>> 
>>> registered_nodes = {}
>>> 
>>> _zk = KazooClient(hosts='localhost:2182')
>>> _zk.start()
>>> _zk.add_listener(_state_listener)
>>> _state_listener(_zk.state)
>>> 
>>> # now register a node
>>> register_node('/abc/a')
>>> _zk.stop()
>>> _zk.start()
>> 
>> 
>> If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.
>> 
>> Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().
>> 
>> --Matt
> 


Re: Kazoo 'state listener' issue...

Posted by Matt Wise <ma...@nextdoor.com>.
Hrmm here's a cleaner way to reproduce the issue:

test.py:
> from kazoo.client import KazooClient
> from kazoo.client import KazooState
> from kazoo.handlers.threading import TimeoutError
> from kazoo.handlers.gevent import SequentialGeventHandler
> import logging
> 
> 
> class Test(object):
>     def __init__(self):
>         self.log = logging.getLogger()
>         format = 'zk_watcher[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>         self.log.setLevel(logging.INFO)
>         formatter = logging.Formatter(format)
>         handler = logging.StreamHandler()
>         handler.setFormatter(formatter)
>         self.log.addHandler(handler)
> 
>         self.registered_nodes = {}
> 
>         self.log.setLevel(logging.DEBUG)
> 
>         self._zk = KazooClient(hosts='localhost:2182', handler=SequentialGeventHandler())
>         self._zk.start()
>         self._zk.add_listener(self._state_listener)
>         self._state_listener(self._zk.state)
> 
>     def test(self):
>         # now register a node
>         self.register_node('/abc/a')
>         self._zk.stop()
>         self._zk.start()
>         self._zk.get_children('/abc')
> 
>     def register_node(self, node, data=None):
>         if node in self.registered_nodes:
>             if data == self.registered_nodes[node]:
>                 self.log.debug('Already registered [%s] in data provider.' % node)
>                 return
>         self.log.debug('Registering [%s] in data provider.' % node)
>         self._zk.create(node, ephemeral=True, makepath=True)
>         self.registered_nodes[node] = data
> 
> 
>     def _state_listener(self,state):
>         self.log.warning('Zookeeper connection state changed: %s' % state)
>         if state == KazooState.SUSPENDED:
>             self.CONNECTION_STATE=False
>         elif state == KazooState.LOST:
>             self.CONNECTION_STATE=False
>         else:
>             self.CONNECTION_STATE=True
>             nodes = {}
>             print self.registered_nodes
>             try:
>                 nodes = self.registered_nodes
>             except:
>                 pass
>             self.registered_nodes = {}
>             for node in nodes.iteritems():
>                 self.register_node(node[0], data=node[1])

python
>>> import test
>>> k = test.Test()
>>> k.test()

(watch it hang ... )


On Dec 9, 2012, at 1:22 PM, Matt Wise <ma...@nextdoor.com> wrote:

> I've got a weird connection issue playing around with Kazoo... If I do something simple like:
> 
>> k = KazooClient()
>> k.start()
>> k.create('/foo')
>> k.stop()
>> k.start()
>> k.create('/foo')
> 
> it works fine... the node is re-created, all is happy.
> 
> However, if i try to use a state_listener callback to automatically re-register any paths that we had registered on our first connection, it fails. In fact, it doesn't really fail .. it hangs. This only happens if we try to do re-register the paths from within the state listener. If we do it outside of that callback (manually) it works fine. Silly code snippet that will cause the problem:
> 
>> from kazoo.client import KazooClient
>> from kazoo.client import KazooState
>> import logging
>> log = logging.getLogger()
>> format = '[%(name)s-%(thread)d-%(funcName)s: (%(levelname)s) %(message)s'
>> log.setLevel(logging.DEBUG)
>> formatter = logging.Formatter(format)
>> handler = logging.StreamHandler()
>> handler.setFormatter(formatter)
>> log.addHandler(handler)
>> 
>> registered_nodes = {}
>> 
>> def register_node(node, data=None):
>>    if node in registered_nodes:
>>        if data == registered_nodes[node]:
>>            log.debug('Already registered [%s] in data provider.' % node)
>>            return
>>    log.debug('Registering [%s] in data provider.' % node)
>>    _zk.create(node, ephemeral=True, makepath=True)
>>    registered_nodes[node] = data
>> 
>> def _re_register_nodes(nodes):
>>    for node in nodes.iteritems():
>>        register_node(node[0], data=node[1])
>> 
>> def _state_listener(state):
>>    log.warning('Zookeeper connection state changed: %s' % state)
>>    if state == KazooState.SUSPENDED:
>>        CONNECTION_STATE=False
>>    elif state == KazooState.LOST:
>>        CONNECTION_STATE=False
>>    else:
>>        CONNECTION_STATE=True
>>        try:
>>            nodes = registered_nodes
>>        except:
>>            # no local nodes
>>        registered_nodes = {}
>>        _re_register_nodes(nodes)
>> 
>> log.setLevel(logging.DEBUG) 
>> 
>> registered_nodes = {}
>> 
>> _zk = KazooClient(hosts='localhost:2182')
>> _zk.start()
>> _zk.add_listener(_state_listener)
>> _state_listener(_zk.state)
>> 
>> # now register a node
>> register_node('/abc/a')
>> _zk.stop()
>> _zk.start()
> 
> 
> If you run this in a python shell, after the _zk.start(), your path will not reregister... instead, the _state_listener() method will basically hang when it calls _re_register_nodes(). The _re_register_nodes() method hangs on the first attempt to call register_node(), which will hang at the _zk.create. Obviously the above code is a bastardized stripped down version of what we're working with, but it replicates the problem. This could just be a problem with my understanding of how the add_listener callback works.. but I'm a bit confused here.
> 
> Long term, the goal is to have our object able to handle a disconnect and gracefully re-generate any paths that had been disconnected during the connection loss. Ironically Kazoo handles this already with its 'watcher' recipe. It just doesn't have the same kind of thing for any paths we create with KazooClient.create().
> 
> --Matt