You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@zookeeper.apache.org by guru singh <gr...@gmail.com> on 2012/05/04 13:06:24 UTC
Watch not sent immediately?
Hi,
Sorry if the subject is not appropriately titled.
I'm trying to implement a redis-failover solution using zookeeper.
I've been working with the python binding for zk
Basically, I have a znode called /master, a watch is set on this so
that, whenever master changes, self.master is upated
There is another znode called /errors, a watch is set on this via
get_children to errors_watcher function.
My code is supposed to continuously loop and create a childe znode on
/errors, whenever an error is detected.
The function errors_watcher, counts the number of children for znode
/errors, if it exceeds a certain length, it writes a new master
'ip:port' to the znode /master, this calls the master watcher and
updates self.master. I use python's threading.Condition() to block for
certain operations, for instance initially when znode /master is
created, I wait() for master_watcher to be called which updates
self.master and releases the lock. This works as expected, however the
problem is that when znode /master is changed from within
errors_watcher, if I wait() for master_watcher to be called, updating
self.master and then releasing the lock. The code just keeps waiting,
the master_watcher is never called. However, if I don't wait after
setting znode /master from within errors_watcher, master_watcher is
called and it updates self.master.
It'll be really helpful if somebody could point out what's wrong? Is
it zk or is my understanding of threading.Condition() incorrect?
Or both :)
Thanks for your help
This code snippet below, simulates the problem.
class ZKtest:
def __init__(self,zk_server):
zk.set_log_stream(open('zk.log','w'))
self.master = None
self.zk_server = zk_server
self.connected = False
self.conn_cv = threading.Condition()
def global_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'global watcher called'
self.connected = True
self.conn_cv.notifyAll()
self.conn_cv.release()
def master_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'master watcher called'
master = zk.get(self.handle,path,self.master_watcher)[0]
self.master = master
print 'Master is %s' %(master)
self.conn_cv.notifyAll()
self.conn_cv.release()
def errors_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'error watcher called'
errors = len(zk.get_children(self.handle,'/errors',self.errors_watcher))
print 'Current errors %d' %(errors)
if errors > 5 :
print 'Set new master, update znode /master'
zk.set(self.handle,'/master','127.0.0.1:6380')
#self.conn_cv.wait() <-- Why doesn't this return??
self.conn_cv.notifyAll()
self.conn_cv.release()
def create_znodes(self):
self.conn_cv.acquire()
master = zk.exists(self.handle,'/master',self.master_watcher)
if not master:
print 'Creating znode /master'
zk.create(self.handle,'/master','127.0.0.1:6379',
[ZOO_OPEN_ACL_UNSAFE])
else :
print 'Updating znode /master'
zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
self.conn_cv.wait() # wait until master_watcher has updated
self.master, this returns after master_watcher is called
print self.master # should not be None, since master_watcher updates it
errors = zk.exists(self.handle,'/errors')
if not errors:
print 'Creating znode /errors'
zk.create(self.handle,'/errors','Errors follow',
[ZOO_OPEN_ACL_UNSAFE])
else :
print 'Purge previous errors'
for err in zk.get_children(self.handle,'/errors'):
zk.delete(self.handle,'/errors/'+err)
err = zk.get_children(self.handle,'/errors',self.errors_watcher)
# set a watch for children of znode /errors
self.conn_cv.release()
def run(self):
self.conn_cv.acquire()
self.handle = zk.init(self.zk_server,self.global_watcher)
if not self.connected:
while not self.connected :
print 'Not Connected, retry in 5'
self.conn_cv.wait(5)
self.handle = zk.init(self.zk_server)
self.create_znodes()
while self.master != '127.0.0.1:6380':
print 'Current Master %s' %(self.master)
# simulate errors, until master is not 127.0.0.1:6380
zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
zk.SEQUENCE)
self.conn_cv.wait()
self.conn_cv.release()
if __name__ == '__main__' :
zkt = ZKtest('127.0.0.1:2181')
zkt.run()
Re: Watch not sent immediately?
Posted by Patrick Hunt <ph...@apache.org>.
Good point Jordan, I added a jira for this:
https://issues.apache.org/jira/browse/ZOOKEEPER-1464
Patrick
On Wed, May 9, 2012 at 1:27 PM, Jordan Zimmerman <jz...@netflix.com> wrote:
> Interesting - this issue has come up several times with Curator users. I
> ended up writing a Tech Note on it.
>
> https://github.com/Netflix/curator/wiki/Tech-Note-1
>
>
> -JZ
>
> On 5/9/12 1:23 PM, "Patrick Hunt" <ph...@apache.org> wrote:
>
>>I believe the issue is that there is a single thread updating
>>watchers. If you block that thread then the event can't be delivered.
>>
>>Patrick
>>
>>On Fri, May 4, 2012 at 4:06 AM, guru singh <gr...@gmail.com> wrote:
>>> Hi,
>>>
>>> Sorry if the subject is not appropriately titled.
>>>
>>> I'm trying to implement a redis-failover solution using zookeeper.
>>> I've been working with the python binding for zk
>>> Basically, I have a znode called /master, a watch is set on this so
>>> that, whenever master changes, self.master is upated
>>> There is another znode called /errors, a watch is set on this via
>>> get_children to errors_watcher function.
>>> My code is supposed to continuously loop and create a childe znode on
>>> /errors, whenever an error is detected.
>>> The function errors_watcher, counts the number of children for znode
>>> /errors, if it exceeds a certain length, it writes a new master
>>> 'ip:port' to the znode /master, this calls the master watcher and
>>> updates self.master. I use python's threading.Condition() to block for
>>> certain operations, for instance initially when znode /master is
>>> created, I wait() for master_watcher to be called which updates
>>> self.master and releases the lock. This works as expected, however the
>>> problem is that when znode /master is changed from within
>>> errors_watcher, if I wait() for master_watcher to be called, updating
>>> self.master and then releasing the lock. The code just keeps waiting,
>>> the master_watcher is never called. However, if I don't wait after
>>> setting znode /master from within errors_watcher, master_watcher is
>>> called and it updates self.master.
>>>
>>> It'll be really helpful if somebody could point out what's wrong? Is
>>> it zk or is my understanding of threading.Condition() incorrect?
>>> Or both :)
>>> Thanks for your help
>>>
>>> This code snippet below, simulates the problem.
>>>
>>> class ZKtest:
>>>
>>> def __init__(self,zk_server):
>>> zk.set_log_stream(open('zk.log','w'))
>>> self.master = None
>>> self.zk_server = zk_server
>>> self.connected = False
>>> self.conn_cv = threading.Condition()
>>>
>>> def global_watcher(self,handle,event,state,path):
>>> self.conn_cv.acquire()
>>> print 'global watcher called'
>>> self.connected = True
>>> self.conn_cv.notifyAll()
>>> self.conn_cv.release()
>>>
>>> def master_watcher(self,handle,event,state,path):
>>> self.conn_cv.acquire()
>>> print 'master watcher called'
>>> master = zk.get(self.handle,path,self.master_watcher)[0]
>>> self.master = master
>>> print 'Master is %s' %(master)
>>> self.conn_cv.notifyAll()
>>> self.conn_cv.release()
>>>
>>> def errors_watcher(self,handle,event,state,path):
>>> self.conn_cv.acquire()
>>> print 'error watcher called'
>>> errors =
>>>len(zk.get_children(self.handle,'/errors',self.errors_watcher))
>>> print 'Current errors %d' %(errors)
>>> if errors > 5 :
>>> print 'Set new master, update znode /master'
>>> zk.set(self.handle,'/master','127.0.0.1:6380')
>>> #self.conn_cv.wait() <-- Why doesn't this return??
>>> self.conn_cv.notifyAll()
>>> self.conn_cv.release()
>>>
>>>
>>> def create_znodes(self):
>>> self.conn_cv.acquire()
>>> master = zk.exists(self.handle,'/master',self.master_watcher)
>>> if not master:
>>> print 'Creating znode /master'
>>> zk.create(self.handle,'/master','127.0.0.1:6379',
>>> [ZOO_OPEN_ACL_UNSAFE])
>>> else :
>>> print 'Updating znode /master'
>>>
>>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
>>> self.conn_cv.wait() # wait until master_watcher has updated
>>> self.master, this returns after master_watcher is called
>>> print self.master # should not be None, since master_watcher
>>>updates it
>>> errors = zk.exists(self.handle,'/errors')
>>> if not errors:
>>> print 'Creating znode /errors'
>>> zk.create(self.handle,'/errors','Errors follow',
>>> [ZOO_OPEN_ACL_UNSAFE])
>>> else :
>>> print 'Purge previous errors'
>>> for err in zk.get_children(self.handle,'/errors'):
>>> zk.delete(self.handle,'/errors/'+err)
>>> err = zk.get_children(self.handle,'/errors',self.errors_watcher)
>>> # set a watch for children of znode /errors
>>> self.conn_cv.release()
>>>
>>>
>>> def run(self):
>>> self.conn_cv.acquire()
>>> self.handle = zk.init(self.zk_server,self.global_watcher)
>>> if not self.connected:
>>> while not self.connected :
>>> print 'Not Connected, retry in 5'
>>> self.conn_cv.wait(5)
>>> self.handle = zk.init(self.zk_server)
>>> self.create_znodes()
>>> while self.master != '127.0.0.1:6380':
>>> print 'Current Master %s' %(self.master)
>>> # simulate errors, until master is not 127.0.0.1:6380
>>>
>>>zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
>>> zk.SEQUENCE)
>>> self.conn_cv.wait()
>>> self.conn_cv.release()
>>>
>>>
>>> if __name__ == '__main__' :
>>> zkt = ZKtest('127.0.0.1:2181')
>>> zkt.run()
>>
>
Re: Watch not sent immediately?
Posted by Jordan Zimmerman <jz...@netflix.com>.
Interesting - this issue has come up several times with Curator users. I
ended up writing a Tech Note on it.
https://github.com/Netflix/curator/wiki/Tech-Note-1
-JZ
On 5/9/12 1:23 PM, "Patrick Hunt" <ph...@apache.org> wrote:
>I believe the issue is that there is a single thread updating
>watchers. If you block that thread then the event can't be delivered.
>
>Patrick
>
>On Fri, May 4, 2012 at 4:06 AM, guru singh <gr...@gmail.com> wrote:
>> Hi,
>>
>> Sorry if the subject is not appropriately titled.
>>
>> I'm trying to implement a redis-failover solution using zookeeper.
>> I've been working with the python binding for zk
>> Basically, I have a znode called /master, a watch is set on this so
>> that, whenever master changes, self.master is upated
>> There is another znode called /errors, a watch is set on this via
>> get_children to errors_watcher function.
>> My code is supposed to continuously loop and create a childe znode on
>> /errors, whenever an error is detected.
>> The function errors_watcher, counts the number of children for znode
>> /errors, if it exceeds a certain length, it writes a new master
>> 'ip:port' to the znode /master, this calls the master watcher and
>> updates self.master. I use python's threading.Condition() to block for
>> certain operations, for instance initially when znode /master is
>> created, I wait() for master_watcher to be called which updates
>> self.master and releases the lock. This works as expected, however the
>> problem is that when znode /master is changed from within
>> errors_watcher, if I wait() for master_watcher to be called, updating
>> self.master and then releasing the lock. The code just keeps waiting,
>> the master_watcher is never called. However, if I don't wait after
>> setting znode /master from within errors_watcher, master_watcher is
>> called and it updates self.master.
>>
>> It'll be really helpful if somebody could point out what's wrong? Is
>> it zk or is my understanding of threading.Condition() incorrect?
>> Or both :)
>> Thanks for your help
>>
>> This code snippet below, simulates the problem.
>>
>> class ZKtest:
>>
>> def __init__(self,zk_server):
>> zk.set_log_stream(open('zk.log','w'))
>> self.master = None
>> self.zk_server = zk_server
>> self.connected = False
>> self.conn_cv = threading.Condition()
>>
>> def global_watcher(self,handle,event,state,path):
>> self.conn_cv.acquire()
>> print 'global watcher called'
>> self.connected = True
>> self.conn_cv.notifyAll()
>> self.conn_cv.release()
>>
>> def master_watcher(self,handle,event,state,path):
>> self.conn_cv.acquire()
>> print 'master watcher called'
>> master = zk.get(self.handle,path,self.master_watcher)[0]
>> self.master = master
>> print 'Master is %s' %(master)
>> self.conn_cv.notifyAll()
>> self.conn_cv.release()
>>
>> def errors_watcher(self,handle,event,state,path):
>> self.conn_cv.acquire()
>> print 'error watcher called'
>> errors =
>>len(zk.get_children(self.handle,'/errors',self.errors_watcher))
>> print 'Current errors %d' %(errors)
>> if errors > 5 :
>> print 'Set new master, update znode /master'
>> zk.set(self.handle,'/master','127.0.0.1:6380')
>> #self.conn_cv.wait() <-- Why doesn't this return??
>> self.conn_cv.notifyAll()
>> self.conn_cv.release()
>>
>>
>> def create_znodes(self):
>> self.conn_cv.acquire()
>> master = zk.exists(self.handle,'/master',self.master_watcher)
>> if not master:
>> print 'Creating znode /master'
>> zk.create(self.handle,'/master','127.0.0.1:6379',
>> [ZOO_OPEN_ACL_UNSAFE])
>> else :
>> print 'Updating znode /master'
>>
>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
>> self.conn_cv.wait() # wait until master_watcher has updated
>> self.master, this returns after master_watcher is called
>> print self.master # should not be None, since master_watcher
>>updates it
>> errors = zk.exists(self.handle,'/errors')
>> if not errors:
>> print 'Creating znode /errors'
>> zk.create(self.handle,'/errors','Errors follow',
>> [ZOO_OPEN_ACL_UNSAFE])
>> else :
>> print 'Purge previous errors'
>> for err in zk.get_children(self.handle,'/errors'):
>> zk.delete(self.handle,'/errors/'+err)
>> err = zk.get_children(self.handle,'/errors',self.errors_watcher)
>> # set a watch for children of znode /errors
>> self.conn_cv.release()
>>
>>
>> def run(self):
>> self.conn_cv.acquire()
>> self.handle = zk.init(self.zk_server,self.global_watcher)
>> if not self.connected:
>> while not self.connected :
>> print 'Not Connected, retry in 5'
>> self.conn_cv.wait(5)
>> self.handle = zk.init(self.zk_server)
>> self.create_znodes()
>> while self.master != '127.0.0.1:6380':
>> print 'Current Master %s' %(self.master)
>> # simulate errors, until master is not 127.0.0.1:6380
>>
>>zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
>> zk.SEQUENCE)
>> self.conn_cv.wait()
>> self.conn_cv.release()
>>
>>
>> if __name__ == '__main__' :
>> zkt = ZKtest('127.0.0.1:2181')
>> zkt.run()
>
Re: Watch not sent immediately?
Posted by Patrick Hunt <ph...@apache.org>.
I believe the issue is that there is a single thread updating
watchers. If you block that thread then the event can't be delivered.
Patrick
On Fri, May 4, 2012 at 4:06 AM, guru singh <gr...@gmail.com> wrote:
> Hi,
>
> Sorry if the subject is not appropriately titled.
>
> I'm trying to implement a redis-failover solution using zookeeper.
> I've been working with the python binding for zk
> Basically, I have a znode called /master, a watch is set on this so
> that, whenever master changes, self.master is upated
> There is another znode called /errors, a watch is set on this via
> get_children to errors_watcher function.
> My code is supposed to continuously loop and create a childe znode on
> /errors, whenever an error is detected.
> The function errors_watcher, counts the number of children for znode
> /errors, if it exceeds a certain length, it writes a new master
> 'ip:port' to the znode /master, this calls the master watcher and
> updates self.master. I use python's threading.Condition() to block for
> certain operations, for instance initially when znode /master is
> created, I wait() for master_watcher to be called which updates
> self.master and releases the lock. This works as expected, however the
> problem is that when znode /master is changed from within
> errors_watcher, if I wait() for master_watcher to be called, updating
> self.master and then releasing the lock. The code just keeps waiting,
> the master_watcher is never called. However, if I don't wait after
> setting znode /master from within errors_watcher, master_watcher is
> called and it updates self.master.
>
> It'll be really helpful if somebody could point out what's wrong? Is
> it zk or is my understanding of threading.Condition() incorrect?
> Or both :)
> Thanks for your help
>
> This code snippet below, simulates the problem.
>
> class ZKtest:
>
> def __init__(self,zk_server):
> zk.set_log_stream(open('zk.log','w'))
> self.master = None
> self.zk_server = zk_server
> self.connected = False
> self.conn_cv = threading.Condition()
>
> def global_watcher(self,handle,event,state,path):
> self.conn_cv.acquire()
> print 'global watcher called'
> self.connected = True
> self.conn_cv.notifyAll()
> self.conn_cv.release()
>
> def master_watcher(self,handle,event,state,path):
> self.conn_cv.acquire()
> print 'master watcher called'
> master = zk.get(self.handle,path,self.master_watcher)[0]
> self.master = master
> print 'Master is %s' %(master)
> self.conn_cv.notifyAll()
> self.conn_cv.release()
>
> def errors_watcher(self,handle,event,state,path):
> self.conn_cv.acquire()
> print 'error watcher called'
> errors = len(zk.get_children(self.handle,'/errors',self.errors_watcher))
> print 'Current errors %d' %(errors)
> if errors > 5 :
> print 'Set new master, update znode /master'
> zk.set(self.handle,'/master','127.0.0.1:6380')
> #self.conn_cv.wait() <-- Why doesn't this return??
> self.conn_cv.notifyAll()
> self.conn_cv.release()
>
>
> def create_znodes(self):
> self.conn_cv.acquire()
> master = zk.exists(self.handle,'/master',self.master_watcher)
> if not master:
> print 'Creating znode /master'
> zk.create(self.handle,'/master','127.0.0.1:6379',
> [ZOO_OPEN_ACL_UNSAFE])
> else :
> print 'Updating znode /master'
> zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
> self.conn_cv.wait() # wait until master_watcher has updated
> self.master, this returns after master_watcher is called
> print self.master # should not be None, since master_watcher updates it
> errors = zk.exists(self.handle,'/errors')
> if not errors:
> print 'Creating znode /errors'
> zk.create(self.handle,'/errors','Errors follow',
> [ZOO_OPEN_ACL_UNSAFE])
> else :
> print 'Purge previous errors'
> for err in zk.get_children(self.handle,'/errors'):
> zk.delete(self.handle,'/errors/'+err)
> err = zk.get_children(self.handle,'/errors',self.errors_watcher)
> # set a watch for children of znode /errors
> self.conn_cv.release()
>
>
> def run(self):
> self.conn_cv.acquire()
> self.handle = zk.init(self.zk_server,self.global_watcher)
> if not self.connected:
> while not self.connected :
> print 'Not Connected, retry in 5'
> self.conn_cv.wait(5)
> self.handle = zk.init(self.zk_server)
> self.create_znodes()
> while self.master != '127.0.0.1:6380':
> print 'Current Master %s' %(self.master)
> # simulate errors, until master is not 127.0.0.1:6380
> zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
> zk.SEQUENCE)
> self.conn_cv.wait()
> self.conn_cv.release()
>
>
> if __name__ == '__main__' :
> zkt = ZKtest('127.0.0.1:2181')
> zkt.run()