You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kyle Banker <ky...@gmail.com> on 2014/06/13 23:01:51 UTC

Reliable Message Commits

I'm using Kafka 0.8.1.1.

I have a simple goal: use the high-level consumer to consume a message from
Kafka, publish the message to a different system, and then commit the
message in Kafka. Based on my reading of the docs and the mailing list, it
seems like this isn't so easy to achieve. Here is my current understanding:

First, I have to disable auto-commit. If the consumer automatically
commits, then I may lose messages if, for example, my process dies after
consuming but before publishing my message.

Next, if my app is multi-threaded, I need to either

a) use a separate consumer per thread (memory-intensive, hard on Zookeeper)
or
b) use a single consumer and assign a KafkaStream to each thread. Then,
when I want to commit, first synchronize all threads using a barrier.

First question: is this correct so far?


Still, it appears that rebalancing may be a problem. In particular, this
sequence of events:

1. I'm consuming from a stream tied to two partitions, A and B.
2. I consume a message, M, from partition A.
3. Partition A gets assigned to a different consumer.
4. I choose not to commit M or my process fails.

Second question: When the partition is reassigned, will the message that I
consumed be automatically committed? If so, then there's no way to get the
reliability I want.


Third question: How do the folks at LinkedIn handle this overall use case?
What about other users?

It seems to me that a lot of the complexity here could be easily addressed
by changing the way in which a partition's message pointer is advanced.
That is, when I consume message M, advance the pointer to message (M - 1)
rather than to M. In other words, calling iterator.next() would imply that
the previously consumed message may be safely committed. If this were the
case, I could simply enable auto-commit and be happy.

RE: Reliable Message Commits

Posted by Jagbir Hooda <js...@hotmail.com>.
I think duplicate message is the right behavior for both patterns
iter.next(); process(message) ; CRASH; consumer.commit();
iter.peek();process(message) ; CRASH; iter.next(); CRASH; consumer.commit();

The only diff is fewer lines of code for the first pattern.
Jagbir

> Date: Mon, 23 Jun 2014 13:49:26 -0700
> Subject: Re: Reliable Message Commits
> From: wangguoz@gmail.com
> To: users@kafka.apache.org
> 
> If there is a crash between
> 
> process(message);
> 
> and
> 
> iter.next(); //consumes the message now
> 
> you will still have duplicates upon restart.
> 
> Guozhang
> 
> 
> On Mon, Jun 23, 2014 at 12:37 PM, Achanta Vamsi Subhash <
> achanta.vamsi@flipkart.com> wrote:
> 
> > What about ths pattern:
> >
> > message = iter.peek(); //gets the message
> > process(message);
> > iter.next(); //consumes the message now
> > consumer.commit();
> >
> >
> >
> > On Fri, Jun 20, 2014 at 11:53 AM, Kyle Banker <ky...@gmail.com>
> > wrote:
> >
> > > Thanks for the advice, Guozhang.
> > >
> > > Jagbir: I'll report back on my progress. I intend to have quite a few
> > > threads across many machines. We'll see how well it performs with a whole
> > > high-level consumer per thread.
> > >
> > >
> > > On Thu, Jun 19, 2014 at 9:30 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Kyle,
> > > >
> > > > For your first question, the first option would be preferable: it may
> > use
> > > > little bit more memory, and have more ZK writes. In 0.9 though, the
> > > offsets
> > > > will be stored in Kafka servers instead of ZK, so you will no longer
> > > > bombard ZK.
> > > >
> > > > For the third question, our designed usage pattern for manual commits
> > > would
> > > > be:
> > > >
> > > > message = iter.next();
> > > > process(message)
> > > > consumer.commit();
> > > >
> > > > Thus if one crashes between process(message) and consumer.commit(), you
> > > do
> > > > incur duplicates; but you will not get any data loss in this case. If
> > you
> > > > are more tolerable to data loss than duplicates, you can do:
> > > >
> > > > message = iter.next();
> > > > consumer.commit();
> > > > process(message)
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <js...@hotmail.com> wrote:
> > > >
> > > > > Hi Kyle,
> > > > >
> > > > > Thanks for the update.  Wondering if you found answer to your N-1
> > > commit
> > > > > question? If auto commit happens only at iterator.next () and onky
> > for
> > > > the
> > > > > N -1 message then client code can be much simpler and reliable as you
> > > > > mentioned. I'm also looking forward to any post in this regard.
> > > > >
> > > > > Jagbir
> > > > >
> > > > > On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com>
> > > > wrote:
> > > > > >I think I've discovered the answer to my second question: according
> > to
> > > > > >the
> > > > > >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> > > > > >offsets
> > > > > >from what's already in Zookeeper. Therefore, uncommitted but
> > consumed
> > > > > >messages from a given partition will be replayed when the partition
> > is
> > > > > >reassigned.
> > > > > >
> > > > > >
> > > > > >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
> > > > > >wrote:
> > > > > >
> > > > > >> I'm using Kafka 0.8.1.1.
> > > > > >>
> > > > > >> I have a simple goal: use the high-level consumer to consume a
> > > > > >message
> > > > > >> from Kafka, publish the message to a different system, and then
> > > > > >commit the
> > > > > >> message in Kafka. Based on my reading of the docs and the mailing
> > > > > >list, it
> > > > > >> seems like this isn't so easy to achieve. Here is my current
> > > > > >understanding:
> > > > > >>
> > > > > >> First, I have to disable auto-commit. If the consumer
> > automatically
> > > > > >> commits, then I may lose messages if, for example, my process dies
> > > > > >after
> > > > > >> consuming but before publishing my message.
> > > > > >>
> > > > > >> Next, if my app is multi-threaded, I need to either
> > > > > >>
> > > > > >> a) use a separate consumer per thread (memory-intensive, hard on
> > > > > >> Zookeeper) or
> > > > > >> b) use a single consumer and assign a KafkaStream to each thread.
> > > > > >Then,
> > > > > >> when I want to commit, first synchronize all threads using a
> > > barrier.
> > > > > >>
> > > > > >> First question: is this correct so far?
> > > > > >>
> > > > > >>
> > > > > >> Still, it appears that rebalancing may be a problem. In
> > particular,
> > > > > >this
> > > > > >> sequence of events:
> > > > > >>
> > > > > >> 1. I'm consuming from a stream tied to two partitions, A and B.
> > > > > >> 2. I consume a message, M, from partition A.
> > > > > >> 3. Partition A gets assigned to a different consumer.
> > > > > >> 4. I choose not to commit M or my process fails.
> > > > > >>
> > > > > >> Second question: When the partition is reassigned, will the
> > message
> > > > > >that I
> > > > > >> consumed be automatically committed? If so, then there's no way to
> > > > > >get the
> > > > > >> reliability I want.
> > > > > >>
> > > > > >>
> > > > > >> Third question: How do the folks at LinkedIn handle this overall
> > use
> > > > > >case?
> > > > > >> What about other users?
> > > > > >>
> > > > > >> It seems to me that a lot of the complexity here could be easily
> > > > > >addressed
> > > > > >> by changing the way in which a partition's message pointer is
> > > > > >advanced.
> > > > > >> That is, when I consume message M, advance the pointer to message
> > (M
> > > > > >- 1)
> > > > > >> rather than to M. In other words, calling iterator.next() would
> > > imply
> > > > > >that
> > > > > >> the previously consumed message may be safely committed. If this
> > > were
> > > > > >the
> > > > > >> case, I could simply enable auto-commit and be happy.
> > > > > >>
> > > > >
> > > > > --
> > > > > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > Regards
> > Vamsi Subhash
> >
> 
> 
> 
> -- 
> -- Guozhang
 		 	   		  

Re: Reliable Message Commits

Posted by Guozhang Wang <wa...@gmail.com>.
If there is a crash between

process(message);

and

iter.next(); //consumes the message now

you will still have duplicates upon restart.

Guozhang


On Mon, Jun 23, 2014 at 12:37 PM, Achanta Vamsi Subhash <
achanta.vamsi@flipkart.com> wrote:

> What about ths pattern:
>
> message = iter.peek(); //gets the message
> process(message);
> iter.next(); //consumes the message now
> consumer.commit();
>
>
>
> On Fri, Jun 20, 2014 at 11:53 AM, Kyle Banker <ky...@gmail.com>
> wrote:
>
> > Thanks for the advice, Guozhang.
> >
> > Jagbir: I'll report back on my progress. I intend to have quite a few
> > threads across many machines. We'll see how well it performs with a whole
> > high-level consumer per thread.
> >
> >
> > On Thu, Jun 19, 2014 at 9:30 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Kyle,
> > >
> > > For your first question, the first option would be preferable: it may
> use
> > > little bit more memory, and have more ZK writes. In 0.9 though, the
> > offsets
> > > will be stored in Kafka servers instead of ZK, so you will no longer
> > > bombard ZK.
> > >
> > > For the third question, our designed usage pattern for manual commits
> > would
> > > be:
> > >
> > > message = iter.next();
> > > process(message)
> > > consumer.commit();
> > >
> > > Thus if one crashes between process(message) and consumer.commit(), you
> > do
> > > incur duplicates; but you will not get any data loss in this case. If
> you
> > > are more tolerable to data loss than duplicates, you can do:
> > >
> > > message = iter.next();
> > > consumer.commit();
> > > process(message)
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <js...@hotmail.com> wrote:
> > >
> > > > Hi Kyle,
> > > >
> > > > Thanks for the update.  Wondering if you found answer to your N-1
> > commit
> > > > question? If auto commit happens only at iterator.next () and onky
> for
> > > the
> > > > N -1 message then client code can be much simpler and reliable as you
> > > > mentioned. I'm also looking forward to any post in this regard.
> > > >
> > > > Jagbir
> > > >
> > > > On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com>
> > > wrote:
> > > > >I think I've discovered the answer to my second question: according
> to
> > > > >the
> > > > >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> > > > >offsets
> > > > >from what's already in Zookeeper. Therefore, uncommitted but
> consumed
> > > > >messages from a given partition will be replayed when the partition
> is
> > > > >reassigned.
> > > > >
> > > > >
> > > > >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
> > > > >wrote:
> > > > >
> > > > >> I'm using Kafka 0.8.1.1.
> > > > >>
> > > > >> I have a simple goal: use the high-level consumer to consume a
> > > > >message
> > > > >> from Kafka, publish the message to a different system, and then
> > > > >commit the
> > > > >> message in Kafka. Based on my reading of the docs and the mailing
> > > > >list, it
> > > > >> seems like this isn't so easy to achieve. Here is my current
> > > > >understanding:
> > > > >>
> > > > >> First, I have to disable auto-commit. If the consumer
> automatically
> > > > >> commits, then I may lose messages if, for example, my process dies
> > > > >after
> > > > >> consuming but before publishing my message.
> > > > >>
> > > > >> Next, if my app is multi-threaded, I need to either
> > > > >>
> > > > >> a) use a separate consumer per thread (memory-intensive, hard on
> > > > >> Zookeeper) or
> > > > >> b) use a single consumer and assign a KafkaStream to each thread.
> > > > >Then,
> > > > >> when I want to commit, first synchronize all threads using a
> > barrier.
> > > > >>
> > > > >> First question: is this correct so far?
> > > > >>
> > > > >>
> > > > >> Still, it appears that rebalancing may be a problem. In
> particular,
> > > > >this
> > > > >> sequence of events:
> > > > >>
> > > > >> 1. I'm consuming from a stream tied to two partitions, A and B.
> > > > >> 2. I consume a message, M, from partition A.
> > > > >> 3. Partition A gets assigned to a different consumer.
> > > > >> 4. I choose not to commit M or my process fails.
> > > > >>
> > > > >> Second question: When the partition is reassigned, will the
> message
> > > > >that I
> > > > >> consumed be automatically committed? If so, then there's no way to
> > > > >get the
> > > > >> reliability I want.
> > > > >>
> > > > >>
> > > > >> Third question: How do the folks at LinkedIn handle this overall
> use
> > > > >case?
> > > > >> What about other users?
> > > > >>
> > > > >> It seems to me that a lot of the complexity here could be easily
> > > > >addressed
> > > > >> by changing the way in which a partition's message pointer is
> > > > >advanced.
> > > > >> That is, when I consume message M, advance the pointer to message
> (M
> > > > >- 1)
> > > > >> rather than to M. In other words, calling iterator.next() would
> > imply
> > > > >that
> > > > >> the previously consumed message may be safely committed. If this
> > were
> > > > >the
> > > > >> case, I could simply enable auto-commit and be happy.
> > > > >>
> > > >
> > > > --
> > > > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
> > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> Regards
> Vamsi Subhash
>



-- 
-- Guozhang

Re: Reliable Message Commits

Posted by Achanta Vamsi Subhash <ac...@flipkart.com>.
What about ths pattern:

message = iter.peek(); //gets the message
process(message);
iter.next(); //consumes the message now
consumer.commit();



On Fri, Jun 20, 2014 at 11:53 AM, Kyle Banker <ky...@gmail.com> wrote:

> Thanks for the advice, Guozhang.
>
> Jagbir: I'll report back on my progress. I intend to have quite a few
> threads across many machines. We'll see how well it performs with a whole
> high-level consumer per thread.
>
>
> On Thu, Jun 19, 2014 at 9:30 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Kyle,
> >
> > For your first question, the first option would be preferable: it may use
> > little bit more memory, and have more ZK writes. In 0.9 though, the
> offsets
> > will be stored in Kafka servers instead of ZK, so you will no longer
> > bombard ZK.
> >
> > For the third question, our designed usage pattern for manual commits
> would
> > be:
> >
> > message = iter.next();
> > process(message)
> > consumer.commit();
> >
> > Thus if one crashes between process(message) and consumer.commit(), you
> do
> > incur duplicates; but you will not get any data loss in this case. If you
> > are more tolerable to data loss than duplicates, you can do:
> >
> > message = iter.next();
> > consumer.commit();
> > process(message)
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <js...@hotmail.com> wrote:
> >
> > > Hi Kyle,
> > >
> > > Thanks for the update.  Wondering if you found answer to your N-1
> commit
> > > question? If auto commit happens only at iterator.next () and onky for
> > the
> > > N -1 message then client code can be much simpler and reliable as you
> > > mentioned. I'm also looking forward to any post in this regard.
> > >
> > > Jagbir
> > >
> > > On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com>
> > wrote:
> > > >I think I've discovered the answer to my second question: according to
> > > >the
> > > >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> > > >offsets
> > > >from what's already in Zookeeper. Therefore, uncommitted but consumed
> > > >messages from a given partition will be replayed when the partition is
> > > >reassigned.
> > > >
> > > >
> > > >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
> > > >wrote:
> > > >
> > > >> I'm using Kafka 0.8.1.1.
> > > >>
> > > >> I have a simple goal: use the high-level consumer to consume a
> > > >message
> > > >> from Kafka, publish the message to a different system, and then
> > > >commit the
> > > >> message in Kafka. Based on my reading of the docs and the mailing
> > > >list, it
> > > >> seems like this isn't so easy to achieve. Here is my current
> > > >understanding:
> > > >>
> > > >> First, I have to disable auto-commit. If the consumer automatically
> > > >> commits, then I may lose messages if, for example, my process dies
> > > >after
> > > >> consuming but before publishing my message.
> > > >>
> > > >> Next, if my app is multi-threaded, I need to either
> > > >>
> > > >> a) use a separate consumer per thread (memory-intensive, hard on
> > > >> Zookeeper) or
> > > >> b) use a single consumer and assign a KafkaStream to each thread.
> > > >Then,
> > > >> when I want to commit, first synchronize all threads using a
> barrier.
> > > >>
> > > >> First question: is this correct so far?
> > > >>
> > > >>
> > > >> Still, it appears that rebalancing may be a problem. In particular,
> > > >this
> > > >> sequence of events:
> > > >>
> > > >> 1. I'm consuming from a stream tied to two partitions, A and B.
> > > >> 2. I consume a message, M, from partition A.
> > > >> 3. Partition A gets assigned to a different consumer.
> > > >> 4. I choose not to commit M or my process fails.
> > > >>
> > > >> Second question: When the partition is reassigned, will the message
> > > >that I
> > > >> consumed be automatically committed? If so, then there's no way to
> > > >get the
> > > >> reliability I want.
> > > >>
> > > >>
> > > >> Third question: How do the folks at LinkedIn handle this overall use
> > > >case?
> > > >> What about other users?
> > > >>
> > > >> It seems to me that a lot of the complexity here could be easily
> > > >addressed
> > > >> by changing the way in which a partition's message pointer is
> > > >advanced.
> > > >> That is, when I consume message M, advance the pointer to message (M
> > > >- 1)
> > > >> rather than to M. In other words, calling iterator.next() would
> imply
> > > >that
> > > >> the previously consumed message may be safely committed. If this
> were
> > > >the
> > > >> case, I could simply enable auto-commit and be happy.
> > > >>
> > >
> > > --
> > > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
> >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
Regards
Vamsi Subhash

Re: Reliable Message Commits

Posted by Kyle Banker <ky...@gmail.com>.
Thanks for the advice, Guozhang.

Jagbir: I'll report back on my progress. I intend to have quite a few
threads across many machines. We'll see how well it performs with a whole
high-level consumer per thread.


On Thu, Jun 19, 2014 at 9:30 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Kyle,
>
> For your first question, the first option would be preferable: it may use
> little bit more memory, and have more ZK writes. In 0.9 though, the offsets
> will be stored in Kafka servers instead of ZK, so you will no longer
> bombard ZK.
>
> For the third question, our designed usage pattern for manual commits would
> be:
>
> message = iter.next();
> process(message)
> consumer.commit();
>
> Thus if one crashes between process(message) and consumer.commit(), you do
> incur duplicates; but you will not get any data loss in this case. If you
> are more tolerable to data loss than duplicates, you can do:
>
> message = iter.next();
> consumer.commit();
> process(message)
>
>
> Guozhang
>
>
> On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <js...@hotmail.com> wrote:
>
> > Hi Kyle,
> >
> > Thanks for the update.  Wondering if you found answer to your N-1 commit
> > question? If auto commit happens only at iterator.next () and onky for
> the
> > N -1 message then client code can be much simpler and reliable as you
> > mentioned. I'm also looking forward to any post in this regard.
> >
> > Jagbir
> >
> > On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com>
> wrote:
> > >I think I've discovered the answer to my second question: according to
> > >the
> > >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> > >offsets
> > >from what's already in Zookeeper. Therefore, uncommitted but consumed
> > >messages from a given partition will be replayed when the partition is
> > >reassigned.
> > >
> > >
> > >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
> > >wrote:
> > >
> > >> I'm using Kafka 0.8.1.1.
> > >>
> > >> I have a simple goal: use the high-level consumer to consume a
> > >message
> > >> from Kafka, publish the message to a different system, and then
> > >commit the
> > >> message in Kafka. Based on my reading of the docs and the mailing
> > >list, it
> > >> seems like this isn't so easy to achieve. Here is my current
> > >understanding:
> > >>
> > >> First, I have to disable auto-commit. If the consumer automatically
> > >> commits, then I may lose messages if, for example, my process dies
> > >after
> > >> consuming but before publishing my message.
> > >>
> > >> Next, if my app is multi-threaded, I need to either
> > >>
> > >> a) use a separate consumer per thread (memory-intensive, hard on
> > >> Zookeeper) or
> > >> b) use a single consumer and assign a KafkaStream to each thread.
> > >Then,
> > >> when I want to commit, first synchronize all threads using a barrier.
> > >>
> > >> First question: is this correct so far?
> > >>
> > >>
> > >> Still, it appears that rebalancing may be a problem. In particular,
> > >this
> > >> sequence of events:
> > >>
> > >> 1. I'm consuming from a stream tied to two partitions, A and B.
> > >> 2. I consume a message, M, from partition A.
> > >> 3. Partition A gets assigned to a different consumer.
> > >> 4. I choose not to commit M or my process fails.
> > >>
> > >> Second question: When the partition is reassigned, will the message
> > >that I
> > >> consumed be automatically committed? If so, then there's no way to
> > >get the
> > >> reliability I want.
> > >>
> > >>
> > >> Third question: How do the folks at LinkedIn handle this overall use
> > >case?
> > >> What about other users?
> > >>
> > >> It seems to me that a lot of the complexity here could be easily
> > >addressed
> > >> by changing the way in which a partition's message pointer is
> > >advanced.
> > >> That is, when I consume message M, advance the pointer to message (M
> > >- 1)
> > >> rather than to M. In other words, calling iterator.next() would imply
> > >that
> > >> the previously consumed message may be safely committed. If this were
> > >the
> > >> case, I could simply enable auto-commit and be happy.
> > >>
> >
> > --
> > Sent from my Android phone with K-9 Mail. Please excuse my brevity.
>
>
>
>
> --
> -- Guozhang
>

Re: Reliable Message Commits

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Kyle,

For your first question, the first option would be preferable: it may use
little bit more memory, and have more ZK writes. In 0.9 though, the offsets
will be stored in Kafka servers instead of ZK, so you will no longer
bombard ZK.

For the third question, our designed usage pattern for manual commits would
be:

message = iter.next();
process(message)
consumer.commit();

Thus if one crashes between process(message) and consumer.commit(), you do
incur duplicates; but you will not get any data loss in this case. If you
are more tolerable to data loss than duplicates, you can do:

message = iter.next();
consumer.commit();
process(message)


Guozhang


On Wed, Jun 18, 2014 at 9:43 PM, Jagbir <js...@hotmail.com> wrote:

> Hi Kyle,
>
> Thanks for the update.  Wondering if you found answer to your N-1 commit
> question? If auto commit happens only at iterator.next () and onky for the
> N -1 message then client code can be much simpler and reliable as you
> mentioned. I'm also looking forward to any post in this regard.
>
> Jagbir
>
> On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com> wrote:
> >I think I've discovered the answer to my second question: according to
> >the
> >code in ZookeeperConsumerConnector.scala, a rebalance derives its
> >offsets
> >from what's already in Zookeeper. Therefore, uncommitted but consumed
> >messages from a given partition will be replayed when the partition is
> >reassigned.
> >
> >
> >On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
> >wrote:
> >
> >> I'm using Kafka 0.8.1.1.
> >>
> >> I have a simple goal: use the high-level consumer to consume a
> >message
> >> from Kafka, publish the message to a different system, and then
> >commit the
> >> message in Kafka. Based on my reading of the docs and the mailing
> >list, it
> >> seems like this isn't so easy to achieve. Here is my current
> >understanding:
> >>
> >> First, I have to disable auto-commit. If the consumer automatically
> >> commits, then I may lose messages if, for example, my process dies
> >after
> >> consuming but before publishing my message.
> >>
> >> Next, if my app is multi-threaded, I need to either
> >>
> >> a) use a separate consumer per thread (memory-intensive, hard on
> >> Zookeeper) or
> >> b) use a single consumer and assign a KafkaStream to each thread.
> >Then,
> >> when I want to commit, first synchronize all threads using a barrier.
> >>
> >> First question: is this correct so far?
> >>
> >>
> >> Still, it appears that rebalancing may be a problem. In particular,
> >this
> >> sequence of events:
> >>
> >> 1. I'm consuming from a stream tied to two partitions, A and B.
> >> 2. I consume a message, M, from partition A.
> >> 3. Partition A gets assigned to a different consumer.
> >> 4. I choose not to commit M or my process fails.
> >>
> >> Second question: When the partition is reassigned, will the message
> >that I
> >> consumed be automatically committed? If so, then there's no way to
> >get the
> >> reliability I want.
> >>
> >>
> >> Third question: How do the folks at LinkedIn handle this overall use
> >case?
> >> What about other users?
> >>
> >> It seems to me that a lot of the complexity here could be easily
> >addressed
> >> by changing the way in which a partition's message pointer is
> >advanced.
> >> That is, when I consume message M, advance the pointer to message (M
> >- 1)
> >> rather than to M. In other words, calling iterator.next() would imply
> >that
> >> the previously consumed message may be safely committed. If this were
> >the
> >> case, I could simply enable auto-commit and be happy.
> >>
>
> --
> Sent from my Android phone with K-9 Mail. Please excuse my brevity.




-- 
-- Guozhang

Re: Reliable Message Commits

Posted by Jagbir <js...@hotmail.com>.
Hi Kyle, 

Thanks for the update.  Wondering if you found answer to your N-1 commit question? If auto commit happens only at iterator.next () and onky for the N -1 message then client code can be much simpler and reliable as you mentioned. I'm also looking forward to any post in this regard. 

Jagbir 

On June 18, 2014 3:17:25 PM PDT, Kyle Banker <ky...@gmail.com> wrote:
>I think I've discovered the answer to my second question: according to
>the
>code in ZookeeperConsumerConnector.scala, a rebalance derives its
>offsets
>from what's already in Zookeeper. Therefore, uncommitted but consumed
>messages from a given partition will be replayed when the partition is
>reassigned.
>
>
>On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com>
>wrote:
>
>> I'm using Kafka 0.8.1.1.
>>
>> I have a simple goal: use the high-level consumer to consume a
>message
>> from Kafka, publish the message to a different system, and then
>commit the
>> message in Kafka. Based on my reading of the docs and the mailing
>list, it
>> seems like this isn't so easy to achieve. Here is my current
>understanding:
>>
>> First, I have to disable auto-commit. If the consumer automatically
>> commits, then I may lose messages if, for example, my process dies
>after
>> consuming but before publishing my message.
>>
>> Next, if my app is multi-threaded, I need to either
>>
>> a) use a separate consumer per thread (memory-intensive, hard on
>> Zookeeper) or
>> b) use a single consumer and assign a KafkaStream to each thread.
>Then,
>> when I want to commit, first synchronize all threads using a barrier.
>>
>> First question: is this correct so far?
>>
>>
>> Still, it appears that rebalancing may be a problem. In particular,
>this
>> sequence of events:
>>
>> 1. I'm consuming from a stream tied to two partitions, A and B.
>> 2. I consume a message, M, from partition A.
>> 3. Partition A gets assigned to a different consumer.
>> 4. I choose not to commit M or my process fails.
>>
>> Second question: When the partition is reassigned, will the message
>that I
>> consumed be automatically committed? If so, then there's no way to
>get the
>> reliability I want.
>>
>>
>> Third question: How do the folks at LinkedIn handle this overall use
>case?
>> What about other users?
>>
>> It seems to me that a lot of the complexity here could be easily
>addressed
>> by changing the way in which a partition's message pointer is
>advanced.
>> That is, when I consume message M, advance the pointer to message (M
>- 1)
>> rather than to M. In other words, calling iterator.next() would imply
>that
>> the previously consumed message may be safely committed. If this were
>the
>> case, I could simply enable auto-commit and be happy.
>>

-- 
Sent from my Android phone with K-9 Mail. Please excuse my brevity.

Re: Reliable Message Commits

Posted by Kyle Banker <ky...@gmail.com>.
I think I've discovered the answer to my second question: according to the
code in ZookeeperConsumerConnector.scala, a rebalance derives its offsets
from what's already in Zookeeper. Therefore, uncommitted but consumed
messages from a given partition will be replayed when the partition is
reassigned.


On Fri, Jun 13, 2014 at 3:01 PM, Kyle Banker <ky...@gmail.com> wrote:

> I'm using Kafka 0.8.1.1.
>
> I have a simple goal: use the high-level consumer to consume a message
> from Kafka, publish the message to a different system, and then commit the
> message in Kafka. Based on my reading of the docs and the mailing list, it
> seems like this isn't so easy to achieve. Here is my current understanding:
>
> First, I have to disable auto-commit. If the consumer automatically
> commits, then I may lose messages if, for example, my process dies after
> consuming but before publishing my message.
>
> Next, if my app is multi-threaded, I need to either
>
> a) use a separate consumer per thread (memory-intensive, hard on
> Zookeeper) or
> b) use a single consumer and assign a KafkaStream to each thread. Then,
> when I want to commit, first synchronize all threads using a barrier.
>
> First question: is this correct so far?
>
>
> Still, it appears that rebalancing may be a problem. In particular, this
> sequence of events:
>
> 1. I'm consuming from a stream tied to two partitions, A and B.
> 2. I consume a message, M, from partition A.
> 3. Partition A gets assigned to a different consumer.
> 4. I choose not to commit M or my process fails.
>
> Second question: When the partition is reassigned, will the message that I
> consumed be automatically committed? If so, then there's no way to get the
> reliability I want.
>
>
> Third question: How do the folks at LinkedIn handle this overall use case?
> What about other users?
>
> It seems to me that a lot of the complexity here could be easily addressed
> by changing the way in which a partition's message pointer is advanced.
> That is, when I consume message M, advance the pointer to message (M - 1)
> rather than to M. In other words, calling iterator.next() would imply that
> the previously consumed message may be safely committed. If this were the
> case, I could simply enable auto-commit and be happy.
>