You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Rosenberg <jb...@squareup.com> on 2013/10/16 01:16:43 UTC

Handling consumer rebalance when implementing synchronous auto-offset commit

I'm looking at implementing a synchronous auto offset commit solution.
 People have discussed the need for this in previous
threads......Basically, in my consumer loop, I want to make sure a message
has been actually processed before allowing it's offset to be committed.
 But I don't want to commit on every message, since that would be too
expensive.  So, I want to use the 'auto.commit.interval.ms' to periodically
call commitOffsets, but only after a message is processed, but not after
the next message has been issued via a call to 'next()' on the
ConsumerIterator.

The builtin 'auto.commit.enable' feature unfortunately allows commits to
happen on any message that has been returned via ConsumerIterator.next().
 But if the consumer goes down before actually processing the message, or
if it hangs indefinitely for some reason, then this message will get
committed before it has actually been consumed successfully.

I think there are issues with trying to implement this on top of the
high-level consumer api.  First, I need to worry about multiple threads
consuming in the same connector (so for now I'm limiting this to support
only 1 thread).

Also, when shutting down the connector, I need to make sure any pending
messages are committed before allowing the connector to shutdown.  So, that
seems easy enough to handle.

One thing I'm more concerned with, is what happens when there's a consumer
rebalance.  Looking at the ZookeeperConsumerConnector code, it seems there
are explicit calls to commitOffsets during the rebalance.  I'm not sure how
to handle that from the high-level api (and do I need to worry about that?).

Thanks for any insight.

Jason

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jason Rosenberg <jb...@squareup.com>.
awesome


On Fri, Oct 18, 2013 at 1:10 AM, Joel Koshy <jj...@gmail.com> wrote:

> We should be able to get this in after 0.8.1 and probably before the client
> rewrite.
>
> Thanks,
>
> Joel
>
> On Wednesday, October 16, 2013, Jason Rosenberg wrote:
>
> > This looks great.   What is the time frame for this effort?
> >
> > Jason
> >
> >
> > On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy <jj...@gmail.com> wrote:
> >
> > > Btw, after we complete KAFKA-1000 (offset management in Kafka) it
> > > should be reasonable to commit offsets on every message as long as the
> > > optional metadata portion of the offset commit request is small/empty.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > >
> > > On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > > That would be great.  Additionally, in the new api, it would be
> awesome
> > > > augment the default auto-commit functionality to allow client code to
> > > mark
> > > > a message for commit only after processing a message successfully!
> > > >
> > > >
> > > > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > >> For manual offset commits, it will be useful to have some kind of
> API
> > > that
> > > >> informs the client when a rebalance is going to happen. We can think
> > > about
> > > >> this when we do the client rewrite.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >>
> > > >> > Jun,
> > > >> >
> > > >> > Yes, sorry, I think that was the basis for my question.   When
> auto
> > > >> commit
> > > >> > is enabled, special care is taken to make sure things are
> > > auto-committed
> > > >> > during a rebalance.  This is needed because when a topic moves off
> > of
> > > a
> > > >> > consumer thread (since it is being rebalanced to another one),
> it's
> > > as if
> > > >> > that topic is being shutdown on that connector, and any
> > > not-yet-committed
> > > >> > messages need to be committed before letting  go of the topic.
> > > >> >
> > > >> > So, my question is around trying to understand if there's a way I
> > can
> > > >> > reproduce similar functionality using my own sync auto commit
> > > >> > implementation (and I'm not sure there is).  It seems that when
> > > there's a
> > > >> > rebalance, all processed but not-yet-committed offsets will not be
> > > >> > committed, and thus there will be no way to prevent pretty massive
> > > >> > duplicate consumption on a rebalance.  Is that about right?  Or is
> > > there
> > > >> > someway around this that I'm not seeing?
> > > >> >
> > > >> > The auto-commit functionality that's builtin is so close to being
> > all
> > > >> that
> > > >> > anyone would need, except it has a glaring weakness, in that it
> will
> > > >> cause
> > > >> > messages to be lost from time to time, and so I don't know that it
> > > will
> > > >> > meet the needs of trying to have reliable delivery (with
> duplicates
> > > ok).
> > > >> >
> > > >> > Jason
> > > >> >
> > > >> >
> > > >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > >> >
> > > >> > > If auto commit is disabled, the consumer connector won't call
> > > >> > commitOffsets
> > > >> > > during rebalancing.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jun
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <
> > jbr@squareup.com>
> > > >> > wrote:
> > > >> > >
> > > >> > > > I'm looking at implementing a synchronous auto offset commit
> > > >> solution.
> > > >> > > >  People have discussed the need for this in previous
> > > >> > > > threads......Basically, in my consumer loop, I want to make
> > sure a
> > > >> > > message
> > > >> > > > has been actually processed before allowing it's offset to be
> > > >> > committed.
> > > >> > > >  But I d
>
>
>
> --
> Sent from Gmail Mobile
>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Joel Koshy <jj...@gmail.com>.
We should be able to get this in after 0.8.1 and probably before the client
rewrite.

Thanks,

Joel

On Wednesday, October 16, 2013, Jason Rosenberg wrote:

> This looks great.   What is the time frame for this effort?
>
> Jason
>
>
> On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > Btw, after we complete KAFKA-1000 (offset management in Kafka) it
> > should be reasonable to commit offsets on every message as long as the
> > optional metadata portion of the offset commit request is small/empty.
> >
> > Thanks,
> >
> > Joel
> >
> >
> > On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > > That would be great.  Additionally, in the new api, it would be awesome
> > > augment the default auto-commit functionality to allow client code to
> > mark
> > > a message for commit only after processing a message successfully!
> > >
> > >
> > > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> For manual offset commits, it will be useful to have some kind of API
> > that
> > >> informs the client when a rebalance is going to happen. We can think
> > about
> > >> this when we do the client rewrite.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >>
> > >> > Jun,
> > >> >
> > >> > Yes, sorry, I think that was the basis for my question.   When auto
> > >> commit
> > >> > is enabled, special care is taken to make sure things are
> > auto-committed
> > >> > during a rebalance.  This is needed because when a topic moves off
> of
> > a
> > >> > consumer thread (since it is being rebalanced to another one), it's
> > as if
> > >> > that topic is being shutdown on that connector, and any
> > not-yet-committed
> > >> > messages need to be committed before letting  go of the topic.
> > >> >
> > >> > So, my question is around trying to understand if there's a way I
> can
> > >> > reproduce similar functionality using my own sync auto commit
> > >> > implementation (and I'm not sure there is).  It seems that when
> > there's a
> > >> > rebalance, all processed but not-yet-committed offsets will not be
> > >> > committed, and thus there will be no way to prevent pretty massive
> > >> > duplicate consumption on a rebalance.  Is that about right?  Or is
> > there
> > >> > someway around this that I'm not seeing?
> > >> >
> > >> > The auto-commit functionality that's builtin is so close to being
> all
> > >> that
> > >> > anyone would need, except it has a glaring weakness, in that it will
> > >> cause
> > >> > messages to be lost from time to time, and so I don't know that it
> > will
> > >> > meet the needs of trying to have reliable delivery (with duplicates
> > ok).
> > >> >
> > >> > Jason
> > >> >
> > >> >
> > >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:
> > >> >
> > >> > > If auto commit is disabled, the consumer connector won't call
> > >> > commitOffsets
> > >> > > during rebalancing.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <
> jbr@squareup.com>
> > >> > wrote:
> > >> > >
> > >> > > > I'm looking at implementing a synchronous auto offset commit
> > >> solution.
> > >> > > >  People have discussed the need for this in previous
> > >> > > > threads......Basically, in my consumer loop, I want to make
> sure a
> > >> > > message
> > >> > > > has been actually processed before allowing it's offset to be
> > >> > committed.
> > >> > > >  But I d



-- 
Sent from Gmail Mobile

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jason Rosenberg <jb...@squareup.com>.
This looks great.   What is the time frame for this effort?

Jason


On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy <jj...@gmail.com> wrote:

> Btw, after we complete KAFKA-1000 (offset management in Kafka) it
> should be reasonable to commit offsets on every message as long as the
> optional metadata portion of the offset commit request is small/empty.
>
> Thanks,
>
> Joel
>
>
> On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> > That would be great.  Additionally, in the new api, it would be awesome
> > augment the default auto-commit functionality to allow client code to
> mark
> > a message for commit only after processing a message successfully!
> >
> >
> > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> For manual offset commits, it will be useful to have some kind of API
> that
> >> informs the client when a rebalance is going to happen. We can think
> about
> >> this when we do the client rewrite.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >>
> >> > Jun,
> >> >
> >> > Yes, sorry, I think that was the basis for my question.   When auto
> >> commit
> >> > is enabled, special care is taken to make sure things are
> auto-committed
> >> > during a rebalance.  This is needed because when a topic moves off of
> a
> >> > consumer thread (since it is being rebalanced to another one), it's
> as if
> >> > that topic is being shutdown on that connector, and any
> not-yet-committed
> >> > messages need to be committed before letting  go of the topic.
> >> >
> >> > So, my question is around trying to understand if there's a way I can
> >> > reproduce similar functionality using my own sync auto commit
> >> > implementation (and I'm not sure there is).  It seems that when
> there's a
> >> > rebalance, all processed but not-yet-committed offsets will not be
> >> > committed, and thus there will be no way to prevent pretty massive
> >> > duplicate consumption on a rebalance.  Is that about right?  Or is
> there
> >> > someway around this that I'm not seeing?
> >> >
> >> > The auto-commit functionality that's builtin is so close to being all
> >> that
> >> > anyone would need, except it has a glaring weakness, in that it will
> >> cause
> >> > messages to be lost from time to time, and so I don't know that it
> will
> >> > meet the needs of trying to have reliable delivery (with duplicates
> ok).
> >> >
> >> > Jason
> >> >
> >> >
> >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:
> >> >
> >> > > If auto commit is disabled, the consumer connector won't call
> >> > commitOffsets
> >> > > during rebalancing.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com>
> >> > wrote:
> >> > >
> >> > > > I'm looking at implementing a synchronous auto offset commit
> >> solution.
> >> > > >  People have discussed the need for this in previous
> >> > > > threads......Basically, in my consumer loop, I want to make sure a
> >> > > message
> >> > > > has been actually processed before allowing it's offset to be
> >> > committed.
> >> > > >  But I don't want to commit on every message, since that would be
> too
> >> > > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> >> > > > periodically
> >> > > > call commitOffsets, but only after a message is processed, but not
> >> > after
> >> > > > the next message has been issued via a call to 'next()' on the
> >> > > > ConsumerIterator.
> >> > > >
> >> > > > The builtin 'auto.commit.enable' feature unfortunately allows
> commits
> >> > to
> >> > > > happen on any message that has been returned via
> >> > ConsumerIterator.next().
> >> > > >  But if the consumer goes down before actually processing the
> >> message,
> >> > or
> >> > > > if it hangs indefinitely for some reason, then this message will
> get
> >> > > > committed before it has actually been consumed successfully.
> >> > > >
> >> > > > I think there are issues with trying to implement this on top of
> the
> >> > > > high-level consumer api.  First, I need to worry about multiple
> >> threads
> >> > > > consuming in the same connector (so for now I'm limiting this to
> >> > support
> >> > > > only 1 thread).
> >> > > >
> >> > > > Also, when shutting down the connector, I need to make sure any
> >> pending
> >> > > > messages are committed before allowing the connector to shutdown.
> >>  So,
> >> > > that
> >> > > > seems easy enough to handle.
> >> > > >
> >> > > > One thing I'm more concerned with, is what happens when there's a
> >> > > consumer
> >> > > > rebalance.  Looking at the ZookeeperConsumerConnector code, it
> seems
> >> > > there
> >> > > > are explicit calls to commitOffsets during the rebalance.  I'm not
> >> sure
> >> > > how
> >> > > > to handle that from the high-level api (and do I need to worry
> about
> >> > > > that?).
> >> > > >
> >> > > > Thanks for any insight.
> >> > > >
> >> > > > Jason
> >> > > >
> >> > >
> >> >
> >>
>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Joel Koshy <jj...@gmail.com>.
Btw, after we complete KAFKA-1000 (offset management in Kafka) it
should be reasonable to commit offsets on every message as long as the
optional metadata portion of the offset commit request is small/empty.

Thanks,

Joel


On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg <jb...@squareup.com> wrote:
> That would be great.  Additionally, in the new api, it would be awesome
> augment the default auto-commit functionality to allow client code to mark
> a message for commit only after processing a message successfully!
>
>
> On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <ju...@gmail.com> wrote:
>
>> For manual offset commits, it will be useful to have some kind of API that
>> informs the client when a rebalance is going to happen. We can think about
>> this when we do the client rewrite.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>>
>> > Jun,
>> >
>> > Yes, sorry, I think that was the basis for my question.   When auto
>> commit
>> > is enabled, special care is taken to make sure things are auto-committed
>> > during a rebalance.  This is needed because when a topic moves off of a
>> > consumer thread (since it is being rebalanced to another one), it's as if
>> > that topic is being shutdown on that connector, and any not-yet-committed
>> > messages need to be committed before letting  go of the topic.
>> >
>> > So, my question is around trying to understand if there's a way I can
>> > reproduce similar functionality using my own sync auto commit
>> > implementation (and I'm not sure there is).  It seems that when there's a
>> > rebalance, all processed but not-yet-committed offsets will not be
>> > committed, and thus there will be no way to prevent pretty massive
>> > duplicate consumption on a rebalance.  Is that about right?  Or is there
>> > someway around this that I'm not seeing?
>> >
>> > The auto-commit functionality that's builtin is so close to being all
>> that
>> > anyone would need, except it has a glaring weakness, in that it will
>> cause
>> > messages to be lost from time to time, and so I don't know that it will
>> > meet the needs of trying to have reliable delivery (with duplicates ok).
>> >
>> > Jason
>> >
>> >
>> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> > > If auto commit is disabled, the consumer connector won't call
>> > commitOffsets
>> > > during rebalancing.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com>
>> > wrote:
>> > >
>> > > > I'm looking at implementing a synchronous auto offset commit
>> solution.
>> > > >  People have discussed the need for this in previous
>> > > > threads......Basically, in my consumer loop, I want to make sure a
>> > > message
>> > > > has been actually processed before allowing it's offset to be
>> > committed.
>> > > >  But I don't want to commit on every message, since that would be too
>> > > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
>> > > > periodically
>> > > > call commitOffsets, but only after a message is processed, but not
>> > after
>> > > > the next message has been issued via a call to 'next()' on the
>> > > > ConsumerIterator.
>> > > >
>> > > > The builtin 'auto.commit.enable' feature unfortunately allows commits
>> > to
>> > > > happen on any message that has been returned via
>> > ConsumerIterator.next().
>> > > >  But if the consumer goes down before actually processing the
>> message,
>> > or
>> > > > if it hangs indefinitely for some reason, then this message will get
>> > > > committed before it has actually been consumed successfully.
>> > > >
>> > > > I think there are issues with trying to implement this on top of the
>> > > > high-level consumer api.  First, I need to worry about multiple
>> threads
>> > > > consuming in the same connector (so for now I'm limiting this to
>> > support
>> > > > only 1 thread).
>> > > >
>> > > > Also, when shutting down the connector, I need to make sure any
>> pending
>> > > > messages are committed before allowing the connector to shutdown.
>>  So,
>> > > that
>> > > > seems easy enough to handle.
>> > > >
>> > > > One thing I'm more concerned with, is what happens when there's a
>> > > consumer
>> > > > rebalance.  Looking at the ZookeeperConsumerConnector code, it seems
>> > > there
>> > > > are explicit calls to commitOffsets during the rebalance.  I'm not
>> sure
>> > > how
>> > > > to handle that from the high-level api (and do I need to worry about
>> > > > that?).
>> > > >
>> > > > Thanks for any insight.
>> > > >
>> > > > Jason
>> > > >
>> > >
>> >
>>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jason Rosenberg <jb...@squareup.com>.
That would be great.  Additionally, in the new api, it would be awesome
augment the default auto-commit functionality to allow client code to mark
a message for commit only after processing a message successfully!


On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao <ju...@gmail.com> wrote:

> For manual offset commits, it will be useful to have some kind of API that
> informs the client when a rebalance is going to happen. We can think about
> this when we do the client rewrite.
>
> Thanks,
>
> Jun
>
>
> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Jun,
> >
> > Yes, sorry, I think that was the basis for my question.   When auto
> commit
> > is enabled, special care is taken to make sure things are auto-committed
> > during a rebalance.  This is needed because when a topic moves off of a
> > consumer thread (since it is being rebalanced to another one), it's as if
> > that topic is being shutdown on that connector, and any not-yet-committed
> > messages need to be committed before letting  go of the topic.
> >
> > So, my question is around trying to understand if there's a way I can
> > reproduce similar functionality using my own sync auto commit
> > implementation (and I'm not sure there is).  It seems that when there's a
> > rebalance, all processed but not-yet-committed offsets will not be
> > committed, and thus there will be no way to prevent pretty massive
> > duplicate consumption on a rebalance.  Is that about right?  Or is there
> > someway around this that I'm not seeing?
> >
> > The auto-commit functionality that's builtin is so close to being all
> that
> > anyone would need, except it has a glaring weakness, in that it will
> cause
> > messages to be lost from time to time, and so I don't know that it will
> > meet the needs of trying to have reliable delivery (with duplicates ok).
> >
> > Jason
> >
> >
> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > If auto commit is disabled, the consumer connector won't call
> > commitOffsets
> > > during rebalancing.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com>
> > wrote:
> > >
> > > > I'm looking at implementing a synchronous auto offset commit
> solution.
> > > >  People have discussed the need for this in previous
> > > > threads......Basically, in my consumer loop, I want to make sure a
> > > message
> > > > has been actually processed before allowing it's offset to be
> > committed.
> > > >  But I don't want to commit on every message, since that would be too
> > > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> > > > periodically
> > > > call commitOffsets, but only after a message is processed, but not
> > after
> > > > the next message has been issued via a call to 'next()' on the
> > > > ConsumerIterator.
> > > >
> > > > The builtin 'auto.commit.enable' feature unfortunately allows commits
> > to
> > > > happen on any message that has been returned via
> > ConsumerIterator.next().
> > > >  But if the consumer goes down before actually processing the
> message,
> > or
> > > > if it hangs indefinitely for some reason, then this message will get
> > > > committed before it has actually been consumed successfully.
> > > >
> > > > I think there are issues with trying to implement this on top of the
> > > > high-level consumer api.  First, I need to worry about multiple
> threads
> > > > consuming in the same connector (so for now I'm limiting this to
> > support
> > > > only 1 thread).
> > > >
> > > > Also, when shutting down the connector, I need to make sure any
> pending
> > > > messages are committed before allowing the connector to shutdown.
>  So,
> > > that
> > > > seems easy enough to handle.
> > > >
> > > > One thing I'm more concerned with, is what happens when there's a
> > > consumer
> > > > rebalance.  Looking at the ZookeeperConsumerConnector code, it seems
> > > there
> > > > are explicit calls to commitOffsets during the rebalance.  I'm not
> sure
> > > how
> > > > to handle that from the high-level api (and do I need to worry about
> > > > that?).
> > > >
> > > > Thanks for any insight.
> > > >
> > > > Jason
> > > >
> > >
> >
>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jun Rao <ju...@gmail.com>.
For manual offset commits, it will be useful to have some kind of API that
informs the client when a rebalance is going to happen. We can think about
this when we do the client rewrite.

Thanks,

Jun


On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Jun,
>
> Yes, sorry, I think that was the basis for my question.   When auto commit
> is enabled, special care is taken to make sure things are auto-committed
> during a rebalance.  This is needed because when a topic moves off of a
> consumer thread (since it is being rebalanced to another one), it's as if
> that topic is being shutdown on that connector, and any not-yet-committed
> messages need to be committed before letting  go of the topic.
>
> So, my question is around trying to understand if there's a way I can
> reproduce similar functionality using my own sync auto commit
> implementation (and I'm not sure there is).  It seems that when there's a
> rebalance, all processed but not-yet-committed offsets will not be
> committed, and thus there will be no way to prevent pretty massive
> duplicate consumption on a rebalance.  Is that about right?  Or is there
> someway around this that I'm not seeing?
>
> The auto-commit functionality that's builtin is so close to being all that
> anyone would need, except it has a glaring weakness, in that it will cause
> messages to be lost from time to time, and so I don't know that it will
> meet the needs of trying to have reliable delivery (with duplicates ok).
>
> Jason
>
>
> On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > If auto commit is disabled, the consumer connector won't call
> commitOffsets
> > during rebalancing.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > I'm looking at implementing a synchronous auto offset commit solution.
> > >  People have discussed the need for this in previous
> > > threads......Basically, in my consumer loop, I want to make sure a
> > message
> > > has been actually processed before allowing it's offset to be
> committed.
> > >  But I don't want to commit on every message, since that would be too
> > > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> > > periodically
> > > call commitOffsets, but only after a message is processed, but not
> after
> > > the next message has been issued via a call to 'next()' on the
> > > ConsumerIterator.
> > >
> > > The builtin 'auto.commit.enable' feature unfortunately allows commits
> to
> > > happen on any message that has been returned via
> ConsumerIterator.next().
> > >  But if the consumer goes down before actually processing the message,
> or
> > > if it hangs indefinitely for some reason, then this message will get
> > > committed before it has actually been consumed successfully.
> > >
> > > I think there are issues with trying to implement this on top of the
> > > high-level consumer api.  First, I need to worry about multiple threads
> > > consuming in the same connector (so for now I'm limiting this to
> support
> > > only 1 thread).
> > >
> > > Also, when shutting down the connector, I need to make sure any pending
> > > messages are committed before allowing the connector to shutdown.  So,
> > that
> > > seems easy enough to handle.
> > >
> > > One thing I'm more concerned with, is what happens when there's a
> > consumer
> > > rebalance.  Looking at the ZookeeperConsumerConnector code, it seems
> > there
> > > are explicit calls to commitOffsets during the rebalance.  I'm not sure
> > how
> > > to handle that from the high-level api (and do I need to worry about
> > > that?).
> > >
> > > Thanks for any insight.
> > >
> > > Jason
> > >
> >
>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jason Rosenberg <jb...@squareup.com>.
Jun,

Yes, sorry, I think that was the basis for my question.   When auto commit
is enabled, special care is taken to make sure things are auto-committed
during a rebalance.  This is needed because when a topic moves off of a
consumer thread (since it is being rebalanced to another one), it's as if
that topic is being shutdown on that connector, and any not-yet-committed
messages need to be committed before letting  go of the topic.

So, my question is around trying to understand if there's a way I can
reproduce similar functionality using my own sync auto commit
implementation (and I'm not sure there is).  It seems that when there's a
rebalance, all processed but not-yet-committed offsets will not be
committed, and thus there will be no way to prevent pretty massive
duplicate consumption on a rebalance.  Is that about right?  Or is there
someway around this that I'm not seeing?

The auto-commit functionality that's builtin is so close to being all that
anyone would need, except it has a glaring weakness, in that it will cause
messages to be lost from time to time, and so I don't know that it will
meet the needs of trying to have reliable delivery (with duplicates ok).

Jason


On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao <ju...@gmail.com> wrote:

> If auto commit is disabled, the consumer connector won't call commitOffsets
> during rebalancing.
>
> Thanks,
>
> Jun
>
>
> On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > I'm looking at implementing a synchronous auto offset commit solution.
> >  People have discussed the need for this in previous
> > threads......Basically, in my consumer loop, I want to make sure a
> message
> > has been actually processed before allowing it's offset to be committed.
> >  But I don't want to commit on every message, since that would be too
> > expensive.  So, I want to use the 'auto.commit.interval.ms' to
> > periodically
> > call commitOffsets, but only after a message is processed, but not after
> > the next message has been issued via a call to 'next()' on the
> > ConsumerIterator.
> >
> > The builtin 'auto.commit.enable' feature unfortunately allows commits to
> > happen on any message that has been returned via ConsumerIterator.next().
> >  But if the consumer goes down before actually processing the message, or
> > if it hangs indefinitely for some reason, then this message will get
> > committed before it has actually been consumed successfully.
> >
> > I think there are issues with trying to implement this on top of the
> > high-level consumer api.  First, I need to worry about multiple threads
> > consuming in the same connector (so for now I'm limiting this to support
> > only 1 thread).
> >
> > Also, when shutting down the connector, I need to make sure any pending
> > messages are committed before allowing the connector to shutdown.  So,
> that
> > seems easy enough to handle.
> >
> > One thing I'm more concerned with, is what happens when there's a
> consumer
> > rebalance.  Looking at the ZookeeperConsumerConnector code, it seems
> there
> > are explicit calls to commitOffsets during the rebalance.  I'm not sure
> how
> > to handle that from the high-level api (and do I need to worry about
> > that?).
> >
> > Thanks for any insight.
> >
> > Jason
> >
>

Re: Handling consumer rebalance when implementing synchronous auto-offset commit

Posted by Jun Rao <ju...@gmail.com>.
If auto commit is disabled, the consumer connector won't call commitOffsets
during rebalancing.

Thanks,

Jun


On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> I'm looking at implementing a synchronous auto offset commit solution.
>  People have discussed the need for this in previous
> threads......Basically, in my consumer loop, I want to make sure a message
> has been actually processed before allowing it's offset to be committed.
>  But I don't want to commit on every message, since that would be too
> expensive.  So, I want to use the 'auto.commit.interval.ms' to
> periodically
> call commitOffsets, but only after a message is processed, but not after
> the next message has been issued via a call to 'next()' on the
> ConsumerIterator.
>
> The builtin 'auto.commit.enable' feature unfortunately allows commits to
> happen on any message that has been returned via ConsumerIterator.next().
>  But if the consumer goes down before actually processing the message, or
> if it hangs indefinitely for some reason, then this message will get
> committed before it has actually been consumed successfully.
>
> I think there are issues with trying to implement this on top of the
> high-level consumer api.  First, I need to worry about multiple threads
> consuming in the same connector (so for now I'm limiting this to support
> only 1 thread).
>
> Also, when shutting down the connector, I need to make sure any pending
> messages are committed before allowing the connector to shutdown.  So, that
> seems easy enough to handle.
>
> One thing I'm more concerned with, is what happens when there's a consumer
> rebalance.  Looking at the ZookeeperConsumerConnector code, it seems there
> are explicit calls to commitOffsets during the rebalance.  I'm not sure how
> to handle that from the high-level api (and do I need to worry about
> that?).
>
> Thanks for any insight.
>
> Jason
>