You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Clark Breyman <cl...@breyman.com> on 2014/01/29 20:51:01 UTC

High Level Consumer delivery semantics

Wrestling through the at-least/most-once semantics of my application and I
was hoping for some confirmation of the semantics. I'm not sure I can
classify the high level consumer as either  type.

False ack scenario:
- Thread A: call next() on the ConsumerIterator, advancing the
PartitionTopicInfo offset
- Thread B: commitOffsets() flushed offset of incomplete message to ZK
- Thread A: fail processing (e.g. kill -9)

False retry scenario:
- Thread A: call next() & successfully process, kill -9 before
commitOffsets either in thread or in parallel.

Is this right or am I missing something (likely)? Seems like the semantics
are essentially approximately once.

Re: High Level Consumer delivery semantics

Posted by Clark Breyman <cl...@breyman.com>.
Guozhang,

Thanks. I'm thinking not of threads crashing but processes/vms/networks
disappearing. Lights-out design so that if any of the computers/routers
catch fire I can still sleep.

I think I can get what I want by spinning up N ZookeeperConsumerConnectors
on the topic each with one thread rather than multiple threads on a single
consumer. That eliminates the need for synchronization and gives me
parallelism. Am I correct?

Thanks again,
C


On Wed, Jan 29, 2014 at 4:00 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Clark,
>
> 1. This is true, you need to synchronize these consumer threads when
> calling commitOffsets();
>
> 2. If you are asking what if the consumer thread crashed after
>
> currentTopicInfo.resetConsumeOffset(consumedOffset)
>
> within the next() call, then on its startup, it will lose all these
> in-memory offsets, and read from the ZK which will be smaller than the
> current value, still leading to duplicates but not data losses.
>
> Guozhang
>
>
> On Wed, Jan 29, 2014 at 12:31 PM, Clark Breyman <cl...@breyman.com> wrote:
>
> > Guozhang,
> >
> > Thank make sense except for the following:
> >
> > - the ZookeeperConsumerConnector.commitOffsets() method commits the
> current
> > value of PartitionTopicInfo.consumeOffset  for all of the active streams.
> >
> > - the ConsumerIterator in the streams advances the value of
> > PartitionTopicInfo.consumeOffset *before* next() returns, not after the
> > processing on that message is complete.
> >
> > If you have multiple threads consuming, thread A calling commitOffsets()
> > may commit thread B's retrieved but unprocessed message, no?
> >
> >
> > On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Clark,
> > >
> > > In practice, the client app code need to always commit offset after it
> > has
> > > processed the messages, and hence only the second case may happen,
> > leading
> > > to "at least once".
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman <cl...@breyman.com>
> > wrote:
> > >
> > > > Wrestling through the at-least/most-once semantics of my application
> > and
> > > I
> > > > was hoping for some confirmation of the semantics. I'm not sure I can
> > > > classify the high level consumer as either  type.
> > > >
> > > > False ack scenario:
> > > > - Thread A: call next() on the ConsumerIterator, advancing the
> > > > PartitionTopicInfo offset
> > > > - Thread B: commitOffsets() flushed offset of incomplete message to
> ZK
> > > > - Thread A: fail processing (e.g. kill -9)
> > > >
> > > > False retry scenario:
> > > > - Thread A: call next() & successfully process, kill -9 before
> > > > commitOffsets either in thread or in parallel.
> > > >
> > > > Is this right or am I missing something (likely)? Seems like the
> > > semantics
> > > > are essentially approximately once.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: High Level Consumer delivery semantics

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Clark,

1. This is true, you need to synchronize these consumer threads when
calling commitOffsets();

2. If you are asking what if the consumer thread crashed after

currentTopicInfo.resetConsumeOffset(consumedOffset)

within the next() call, then on its startup, it will lose all these
in-memory offsets, and read from the ZK which will be smaller than the
current value, still leading to duplicates but not data losses.

Guozhang


On Wed, Jan 29, 2014 at 12:31 PM, Clark Breyman <cl...@breyman.com> wrote:

> Guozhang,
>
> Thank make sense except for the following:
>
> - the ZookeeperConsumerConnector.commitOffsets() method commits the current
> value of PartitionTopicInfo.consumeOffset  for all of the active streams.
>
> - the ConsumerIterator in the streams advances the value of
> PartitionTopicInfo.consumeOffset *before* next() returns, not after the
> processing on that message is complete.
>
> If you have multiple threads consuming, thread A calling commitOffsets()
> may commit thread B's retrieved but unprocessed message, no?
>
>
> On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Clark,
> >
> > In practice, the client app code need to always commit offset after it
> has
> > processed the messages, and hence only the second case may happen,
> leading
> > to "at least once".
> >
> > Guozhang
> >
> >
> > On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman <cl...@breyman.com>
> wrote:
> >
> > > Wrestling through the at-least/most-once semantics of my application
> and
> > I
> > > was hoping for some confirmation of the semantics. I'm not sure I can
> > > classify the high level consumer as either  type.
> > >
> > > False ack scenario:
> > > - Thread A: call next() on the ConsumerIterator, advancing the
> > > PartitionTopicInfo offset
> > > - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> > > - Thread A: fail processing (e.g. kill -9)
> > >
> > > False retry scenario:
> > > - Thread A: call next() & successfully process, kill -9 before
> > > commitOffsets either in thread or in parallel.
> > >
> > > Is this right or am I missing something (likely)? Seems like the
> > semantics
> > > are essentially approximately once.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: High Level Consumer delivery semantics

Posted by Clark Breyman <cl...@breyman.com>.
Guozhang,

Thank make sense except for the following:

- the ZookeeperConsumerConnector.commitOffsets() method commits the current
value of PartitionTopicInfo.consumeOffset  for all of the active streams.

- the ConsumerIterator in the streams advances the value of
PartitionTopicInfo.consumeOffset *before* next() returns, not after the
processing on that message is complete.

If you have multiple threads consuming, thread A calling commitOffsets()
may commit thread B's retrieved but unprocessed message, no?


On Wed, Jan 29, 2014 at 12:20 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Clark,
>
> In practice, the client app code need to always commit offset after it has
> processed the messages, and hence only the second case may happen, leading
> to "at least once".
>
> Guozhang
>
>
> On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman <cl...@breyman.com> wrote:
>
> > Wrestling through the at-least/most-once semantics of my application and
> I
> > was hoping for some confirmation of the semantics. I'm not sure I can
> > classify the high level consumer as either  type.
> >
> > False ack scenario:
> > - Thread A: call next() on the ConsumerIterator, advancing the
> > PartitionTopicInfo offset
> > - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> > - Thread A: fail processing (e.g. kill -9)
> >
> > False retry scenario:
> > - Thread A: call next() & successfully process, kill -9 before
> > commitOffsets either in thread or in parallel.
> >
> > Is this right or am I missing something (likely)? Seems like the
> semantics
> > are essentially approximately once.
> >
>
>
>
> --
> -- Guozhang
>

Re: High Level Consumer delivery semantics

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Clark,

In practice, the client app code need to always commit offset after it has
processed the messages, and hence only the second case may happen, leading
to "at least once".

Guozhang


On Wed, Jan 29, 2014 at 11:51 AM, Clark Breyman <cl...@breyman.com> wrote:

> Wrestling through the at-least/most-once semantics of my application and I
> was hoping for some confirmation of the semantics. I'm not sure I can
> classify the high level consumer as either  type.
>
> False ack scenario:
> - Thread A: call next() on the ConsumerIterator, advancing the
> PartitionTopicInfo offset
> - Thread B: commitOffsets() flushed offset of incomplete message to ZK
> - Thread A: fail processing (e.g. kill -9)
>
> False retry scenario:
> - Thread A: call next() & successfully process, kill -9 before
> commitOffsets either in thread or in parallel.
>
> Is this right or am I missing something (likely)? Seems like the semantics
> are essentially approximately once.
>



-- 
-- Guozhang