You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@zookeeper.apache.org by Stuart White <st...@gmail.com> on 2009/01/08 16:15:20 UTC

Distributed queue: how to ensure no lost items?

I'm interested in using ZooKeeper to provide a distributed
producer/consumer queue for my distributed application.

Of course I've been studying the recipes provided for queues, barriers, etc...

My question is: how can I prevent packets of work from being lost if a
process crashes?

For example, following the distributed queue recipe, when a consumer
takes an item from the queue, it removes the first "item" znode under
the "queue" znode.  But, if the consumer immediately crashes after
removing the item from the queue, that item is lost.

Is there a recipe or recommended approach to ensure that no queue
items are lost in the event of process failure?

Thanks!

Re: Distributed queue: how to ensure no lost items?

Posted by Flavio Junqueira <fp...@yahoo-inc.com>.
You can't simply leave an element in the queue until a consumer  
finishes processing it, otherwise multiple consumers may end up  
processing it. What about the following:

- Use a failure detector to detect which consumers are up;
- Before removing an element from the queue, a consumer creates a  
znode (can't be ephemeral) flagging that the consumer is processing  
that element, that I'll call "/processing/pr";
- Once the consumer is done with the element, it removes "/processing/ 
pr";
- There is a watchdog client that periodically verifies if the list of  
elements being processed contains any element being processed by a  
crashed consumer. In that case, this client places the element back in  
the queue. This process needs to be careful not to generate duplicates  
as it is possible that the consumer crashes before it has time to  
delete the element from the queue and after it has created "/ 
processing/pr".

I'm not sure if there is a more efficient solution because now you  
need to implement a failure detection mechanism through zookeeper (or  
externally if you prefer) and an extra watchdog process.

-Flavio

On Jan 8, 2009, at 4:15 PM, Stuart White wrote:

> I'm interested in using ZooKeeper to provide a distributed
> producer/consumer queue for my distributed application.
>
> Of course I've been studying the recipes provided for queues,  
> barriers, etc...
>
> My question is: how can I prevent packets of work from being lost if a
> process crashes?
>
> For example, following the distributed queue recipe, when a consumer
> takes an item from the queue, it removes the first "item" znode under
> the "queue" znode.  But, if the consumer immediately crashes after
> removing the item from the queue, that item is lost.
>
> Is there a recipe or recommended approach to ensure that no queue
> items are lost in the event of process failure?
>
> Thanks!


Re: Maximum number of children

Posted by Mahadev Konar <ma...@yahoo-inc.com>.
Thanks Joshua. 

mahadev


On 1/13/09 10:43 AM, "Joshua Tuberville" <Jo...@eharmony.com>
wrote:

> Thanks to everyone for proposed schemes and I created ZOOKEEPER-272 per your
> request Mahadev.
> 
> Joshua
> 
> 
> -----Original Message-----
> From: Mahadev Konar [mailto:mahadev@yahoo-inc.com]
> Sent: Monday, January 12, 2009 7:04 PM
> To: zookeeper-user@hadoop.apache.org
> Subject: Re: Maximum number of children
> 
> I was going to suggest bucketing with predifined hashes.
> /root/template/data/hashbucket/hash
> 
> For the issue raised by Joshua regarding the length of the output from the
> server -- 
> This is a bug. We seem to allow any number of children (< int) of a node and
> the getchildren call fails to return the children. This leads to a chicken
> and egg problem on how to get rid of the nodes if you do not know them.
> 
> Here we arent saving nething since the server has already processed the
> request and sent us the data. We should get rid of this hard coded limit. I
> am not sure why we had this limit.
> 
> Can you open a jira for this Joshua?
> 
> thanks
> mahadev
> 
> 
> On 1/12/09 5:39 PM, "Stu Hood" <st...@mailtrust.com> wrote:
> 
>> To continue with your current design, you could create a trie based on shared
>> hash prefixes.
>> 
>> /root/template/date/ 1a5e67/2b45dc
>> /root/template/date/ 1a5e67/3d4a1f
>> /root/template/date/ 3d4a1f/1a5e67
>> /root/template/date/ 3d4a1f/2b45dc
>> 
>> Alternatively, you could use what the maildir mail storage format uses:
>> /root/template/date/ eh/eharmony.com/jo/joshuatuberville
>> 
>> Just check with the second one that all of the characters you support in
>> email
>> addresses are supported in znode names.
>> 
>> Thanks,
>> Stu
>> 
>> 
>> -----Original Message-----
>> From: "Joshua Tuberville" <Jo...@eharmony.com>
>> Sent: Monday, January 12, 2009 7:53pm
>> To: "'zookeeper-user@hadoop.apache.org'" <zo...@hadoop.apache.org>
>> Subject: Maximum number of children
>> 
>> Hello,
>> 
>> We are attempting to use ZooKeeper to coordinate daily email thresholds.  To
>> do this we created a node hierarchy of
>> 
>> /root/template/date/email_hash
>> 
>> The idea being that we only send the template to an email address once per
>> day.  This is intended to support millions of email hashes per day. From the
>> ZooKeeper perspective we just attempt a create and if it succeeds we proceed
>> and if we get a node exists exception we stop processing.  This seems to
>> operate fine for over 2 million email hashes so far in testing.  However we
>> also want to prune all previous days nodes to conserve memory.  We have run
>> into a hard limit while using the getChildren method for a given
>> /root/template/date.  If the List of children exceeds the hardcoded 4,194,304
>> byte limit ClientCnxn$SendThread.readLength() throws an exception on line
>> 490.
>> So we have an issue that we can not delete a node that has children nor is it
>> possible to delete a node who has children whose total names exceed 4 Mb.
>> 
>> Any feedback or guidance is appreciated.
>> 
>> Joshua Tuberville
>> 
>> 
> 


RE: Maximum number of children

Posted by Joshua Tuberville <Jo...@eharmony.com>.
Thanks to everyone for proposed schemes and I created ZOOKEEPER-272 per your request Mahadev.

Joshua


-----Original Message-----
From: Mahadev Konar [mailto:mahadev@yahoo-inc.com] 
Sent: Monday, January 12, 2009 7:04 PM
To: zookeeper-user@hadoop.apache.org
Subject: Re: Maximum number of children

I was going to suggest bucketing with predifined hashes.
/root/template/data/hashbucket/hash

For the issue raised by Joshua regarding the length of the output from the
server -- 
This is a bug. We seem to allow any number of children (< int) of a node and
the getchildren call fails to return the children. This leads to a chicken
and egg problem on how to get rid of the nodes if you do not know them.

Here we arent saving nething since the server has already processed the
request and sent us the data. We should get rid of this hard coded limit. I
am not sure why we had this limit.

Can you open a jira for this Joshua?

thanks
mahadev


On 1/12/09 5:39 PM, "Stu Hood" <st...@mailtrust.com> wrote:

> To continue with your current design, you could create a trie based on shared
> hash prefixes.
> 
> /root/template/date/ 1a5e67/2b45dc
> /root/template/date/ 1a5e67/3d4a1f
> /root/template/date/ 3d4a1f/1a5e67
> /root/template/date/ 3d4a1f/2b45dc
> 
> Alternatively, you could use what the maildir mail storage format uses:
> /root/template/date/ eh/eharmony.com/jo/joshuatuberville
> 
> Just check with the second one that all of the characters you support in email
> addresses are supported in znode names.
> 
> Thanks,
> Stu
> 
> 
> -----Original Message-----
> From: "Joshua Tuberville" <Jo...@eharmony.com>
> Sent: Monday, January 12, 2009 7:53pm
> To: "'zookeeper-user@hadoop.apache.org'" <zo...@hadoop.apache.org>
> Subject: Maximum number of children
> 
> Hello,
> 
> We are attempting to use ZooKeeper to coordinate daily email thresholds.  To
> do this we created a node hierarchy of
> 
> /root/template/date/email_hash
> 
> The idea being that we only send the template to an email address once per
> day.  This is intended to support millions of email hashes per day. From the
> ZooKeeper perspective we just attempt a create and if it succeeds we proceed
> and if we get a node exists exception we stop processing.  This seems to
> operate fine for over 2 million email hashes so far in testing.  However we
> also want to prune all previous days nodes to conserve memory.  We have run
> into a hard limit while using the getChildren method for a given
> /root/template/date.  If the List of children exceeds the hardcoded 4,194,304
> byte limit ClientCnxn$SendThread.readLength() throws an exception on line 490.
> So we have an issue that we can not delete a node that has children nor is it
> possible to delete a node who has children whose total names exceed 4 Mb.
> 
> Any feedback or guidance is appreciated.
> 
> Joshua Tuberville
> 
> 


Re: Maximum number of children

Posted by Mahadev Konar <ma...@yahoo-inc.com>.
I was going to suggest bucketing with predifined hashes.
/root/template/data/hashbucket/hash

For the issue raised by Joshua regarding the length of the output from the
server -- 
This is a bug. We seem to allow any number of children (< int) of a node and
the getchildren call fails to return the children. This leads to a chicken
and egg problem on how to get rid of the nodes if you do not know them.

Here we arent saving nething since the server has already processed the
request and sent us the data. We should get rid of this hard coded limit. I
am not sure why we had this limit.

Can you open a jira for this Joshua?

thanks
mahadev


On 1/12/09 5:39 PM, "Stu Hood" <st...@mailtrust.com> wrote:

> To continue with your current design, you could create a trie based on shared
> hash prefixes.
> 
> /root/template/date/ 1a5e67/2b45dc
> /root/template/date/ 1a5e67/3d4a1f
> /root/template/date/ 3d4a1f/1a5e67
> /root/template/date/ 3d4a1f/2b45dc
> 
> Alternatively, you could use what the maildir mail storage format uses:
> /root/template/date/ eh/eharmony.com/jo/joshuatuberville
> 
> Just check with the second one that all of the characters you support in email
> addresses are supported in znode names.
> 
> Thanks,
> Stu
> 
> 
> -----Original Message-----
> From: "Joshua Tuberville" <Jo...@eharmony.com>
> Sent: Monday, January 12, 2009 7:53pm
> To: "'zookeeper-user@hadoop.apache.org'" <zo...@hadoop.apache.org>
> Subject: Maximum number of children
> 
> Hello,
> 
> We are attempting to use ZooKeeper to coordinate daily email thresholds.  To
> do this we created a node hierarchy of
> 
> /root/template/date/email_hash
> 
> The idea being that we only send the template to an email address once per
> day.  This is intended to support millions of email hashes per day. From the
> ZooKeeper perspective we just attempt a create and if it succeeds we proceed
> and if we get a node exists exception we stop processing.  This seems to
> operate fine for over 2 million email hashes so far in testing.  However we
> also want to prune all previous days nodes to conserve memory.  We have run
> into a hard limit while using the getChildren method for a given
> /root/template/date.  If the List of children exceeds the hardcoded 4,194,304
> byte limit ClientCnxn$SendThread.readLength() throws an exception on line 490.
> So we have an issue that we can not delete a node that has children nor is it
> possible to delete a node who has children whose total names exceed 4 Mb.
> 
> Any feedback or guidance is appreciated.
> 
> Joshua Tuberville
> 
> 


RE: Maximum number of children

Posted by Stu Hood <st...@mailtrust.com>.
To continue with your current design, you could create a trie based on shared hash prefixes.

/root/template/date/ 1a5e67/2b45dc
/root/template/date/ 1a5e67/3d4a1f
/root/template/date/ 3d4a1f/1a5e67
/root/template/date/ 3d4a1f/2b45dc

Alternatively, you could use what the maildir mail storage format uses:
/root/template/date/ eh/eharmony.com/jo/joshuatuberville

Just check with the second one that all of the characters you support in email addresses are supported in znode names.

Thanks,
Stu


-----Original Message-----
From: "Joshua Tuberville" <Jo...@eharmony.com>
Sent: Monday, January 12, 2009 7:53pm
To: "'zookeeper-user@hadoop.apache.org'" <zo...@hadoop.apache.org>
Subject: Maximum number of children

Hello,

We are attempting to use ZooKeeper to coordinate daily email thresholds.  To do this we created a node hierarchy of

/root/template/date/email_hash

The idea being that we only send the template to an email address once per day.  This is intended to support millions of email hashes per day. From the ZooKeeper perspective we just attempt a create and if it succeeds we proceed and if we get a node exists exception we stop processing.  This seems to operate fine for over 2 million email hashes so far in testing.  However we also want to prune all previous days nodes to conserve memory.  We have run into a hard limit while using the getChildren method for a given /root/template/date.  If the List of children exceeds the hardcoded 4,194,304 byte limit ClientCnxn$SendThread.readLength() throws an exception on line 490.  So we have an issue that we can not delete a node that has children nor is it possible to delete a node who has children whose total names exceed 4 Mb.  

Any feedback or guidance is appreciated.

Joshua Tuberville



Maximum number of children

Posted by Joshua Tuberville <Jo...@eharmony.com>.
Hello,

We are attempting to use ZooKeeper to coordinate daily email thresholds.  To do this we created a node hierarchy of

/root/template/date/email_hash

The idea being that we only send the template to an email address once per day.  This is intended to support millions of email hashes per day. From the ZooKeeper perspective we just attempt a create and if it succeeds we proceed and if we get a node exists exception we stop processing.  This seems to operate fine for over 2 million email hashes so far in testing.  However we also want to prune all previous days nodes to conserve memory.  We have run into a hard limit while using the getChildren method for a given /root/template/date.  If the List of children exceeds the hardcoded 4,194,304 byte limit ClientCnxn$SendThread.readLength() throws an exception on line 490.  So we have an issue that we can not delete a node that has children nor is it possible to delete a node who has children whose total names exceed 4 Mb.  

Any feedback or guidance is appreciated.

Joshua Tuberville

RE: Distributed queue: how to ensure no lost items?

Posted by Benjamin Reed <br...@yahoo-inc.com>.
That is a good point. you could put a child znode of queue-X that contains the processing history. Like who tried to process and what time they started.

ben

________________________________________
From: Hiram Chirino [chirino@gmail.com]
Sent: Monday, January 12, 2009 8:48 AM
To: zookeeper-user@hadoop.apache.org
Subject: Re: Distributed queue: how to ensure no lost items?

At least once is generally the case in queuing systems unless you can
do a distributed transaction with your consumer.  What comes in handy
in an at least once case, is letting the consumer know that a message
may have 'potentially' already been processed.  That way he can double
check first before he goes off and processes the message again.  But
adding that info in ZK might be more expensive that doing the double
check every time in consumer anyways.

On Thu, Jan 8, 2009 at 11:42 AM, Benjamin Reed <br...@yahoo-inc.com> wrote:
> We should expand that section. the current queue recipe guarantees that things are consumed at most once. to guarantee at least the consumer creates an ephemeral node queue-X-inprocess to indicate that the node is being processed. once the queue element has been processed the consumer deletes queue-X and queue-X-inprocess (in that order).
>
> using an emphemeral node means that if a consumer crashes, the *-inprocess node will be deleted allowing the queue elements it was working on to be consumed by someone else. putting the *-inprocess nodes at the same level of the queue-X nodes allows the consumer to get the list of queue elements and the inprocess flags with the same getChildren call. the *-inprocess flag ensures that only one consumer is processing a given item. by deleting queue-X before queue-X-inprocess we make sure that no other consumer will see queue-X as available for consumption after it is processed and before it is deleted.
>
> this is at last once, because the consumer has a race condition. the consumer may process the item and then crash before it can delete the corresponding queue-X node.
>
> ben
>
> -----Original Message-----
> From: Stuart White [mailto:stuart.white1@gmail.com]
> Sent: Thursday, January 08, 2009 7:15 AM
> To: zookeeper-user@hadoop.apache.org
> Subject: Distributed queue: how to ensure no lost items?
>
> I'm interested in using ZooKeeper to provide a distributed
> producer/consumer queue for my distributed application.
>
> Of course I've been studying the recipes provided for queues, barriers, etc...
>
> My question is: how can I prevent packets of work from being lost if a
> process crashes?
>
> For example, following the distributed queue recipe, when a consumer
> takes an item from the queue, it removes the first "item" znode under
> the "queue" znode.  But, if the consumer immediately crashes after
> removing the item from the queue, that item is lost.
>
> Is there a recipe or recommended approach to ensure that no queue
> items are lost in the event of process failure?
>
> Thanks!
>



--
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://open.iona.com

Re: Distributed queue: how to ensure no lost items?

Posted by Hiram Chirino <ch...@gmail.com>.
At least once is generally the case in queuing systems unless you can
do a distributed transaction with your consumer.  What comes in handy
in an at least once case, is letting the consumer know that a message
may have 'potentially' already been processed.  That way he can double
check first before he goes off and processes the message again.  But
adding that info in ZK might be more expensive that doing the double
check every time in consumer anyways.

On Thu, Jan 8, 2009 at 11:42 AM, Benjamin Reed <br...@yahoo-inc.com> wrote:
> We should expand that section. the current queue recipe guarantees that things are consumed at most once. to guarantee at least the consumer creates an ephemeral node queue-X-inprocess to indicate that the node is being processed. once the queue element has been processed the consumer deletes queue-X and queue-X-inprocess (in that order).
>
> using an emphemeral node means that if a consumer crashes, the *-inprocess node will be deleted allowing the queue elements it was working on to be consumed by someone else. putting the *-inprocess nodes at the same level of the queue-X nodes allows the consumer to get the list of queue elements and the inprocess flags with the same getChildren call. the *-inprocess flag ensures that only one consumer is processing a given item. by deleting queue-X before queue-X-inprocess we make sure that no other consumer will see queue-X as available for consumption after it is processed and before it is deleted.
>
> this is at last once, because the consumer has a race condition. the consumer may process the item and then crash before it can delete the corresponding queue-X node.
>
> ben
>
> -----Original Message-----
> From: Stuart White [mailto:stuart.white1@gmail.com]
> Sent: Thursday, January 08, 2009 7:15 AM
> To: zookeeper-user@hadoop.apache.org
> Subject: Distributed queue: how to ensure no lost items?
>
> I'm interested in using ZooKeeper to provide a distributed
> producer/consumer queue for my distributed application.
>
> Of course I've been studying the recipes provided for queues, barriers, etc...
>
> My question is: how can I prevent packets of work from being lost if a
> process crashes?
>
> For example, following the distributed queue recipe, when a consumer
> takes an item from the queue, it removes the first "item" znode under
> the "queue" znode.  But, if the consumer immediately crashes after
> removing the item from the queue, that item is lost.
>
> Is there a recipe or recommended approach to ensure that no queue
> items are lost in the event of process failure?
>
> Thanks!
>



-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://open.iona.com

RE: Distributed queue: how to ensure no lost items?

Posted by Benjamin Reed <br...@yahoo-inc.com>.
We should expand that section. the current queue recipe guarantees that things are consumed at most once. to guarantee at least the consumer creates an ephemeral node queue-X-inprocess to indicate that the node is being processed. once the queue element has been processed the consumer deletes queue-X and queue-X-inprocess (in that order).

using an emphemeral node means that if a consumer crashes, the *-inprocess node will be deleted allowing the queue elements it was working on to be consumed by someone else. putting the *-inprocess nodes at the same level of the queue-X nodes allows the consumer to get the list of queue elements and the inprocess flags with the same getChildren call. the *-inprocess flag ensures that only one consumer is processing a given item. by deleting queue-X before queue-X-inprocess we make sure that no other consumer will see queue-X as available for consumption after it is processed and before it is deleted.

this is at last once, because the consumer has a race condition. the consumer may process the item and then crash before it can delete the corresponding queue-X node.

ben

-----Original Message-----
From: Stuart White [mailto:stuart.white1@gmail.com] 
Sent: Thursday, January 08, 2009 7:15 AM
To: zookeeper-user@hadoop.apache.org
Subject: Distributed queue: how to ensure no lost items?

I'm interested in using ZooKeeper to provide a distributed
producer/consumer queue for my distributed application.

Of course I've been studying the recipes provided for queues, barriers, etc...

My question is: how can I prevent packets of work from being lost if a
process crashes?

For example, following the distributed queue recipe, when a consumer
takes an item from the queue, it removes the first "item" znode under
the "queue" znode.  But, if the consumer immediately crashes after
removing the item from the queue, that item is lost.

Is there a recipe or recommended approach to ensure that no queue
items are lost in the event of process failure?

Thanks!