You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joey Echeverria <jo...@rocana.com> on 2016/07/26 19:13:57 UTC

Handling long commits during a rebalance

We've been playing around with the new Consumer API and have it an
unfortunate bump in the road. When our onPartitionsRevoked() callback
is called we'd like to be able to commit any data that we were
processing to stable storage so we can then commit the offsets back to
Kafka. This way we don't throw away in progress work. The problem is
that onPartitionsRevoked() is called from the same thread running
poll() which means that heartbeats are paused while the callback is
processing. Our commits sometimes take a few minutes which means that
we'll lose our Kafka session and the partitions we were trying to
commit will get reassigned. The end result of that is more duplicates
in our stored data.

Has anyone else encountered this? We're probably going to just do a
rollback rather than a commit in the callback to return quickly, but I
wanted to check to see if there was something we were missing.

-Joey

Re: Handling long commits during a rebalance

Posted by Joey Echeverria <jo...@rocana.com>.
Thanks Craig and Ewen!

-Joey

On Wed, Jul 27, 2016 at 2:38 AM, craig w <co...@gmail.com> wrote:
> In my case, no....when a rebalance occurs the work being performed can't be
> "paused" and picked up again later, it has to be started again, so when
> onPartitionsRevoked occurs the blocking queue is cleared...again may not
> ideal, but works for this use case.
>
> On Tue, Jul 26, 2016 at 8:53 PM, Joey Echeverria <jo...@rocana.com> wrote:
>
>> That's the direction we're looking at for normal commit processing,
>> but how do you handle commits during a rebalance?
>>
>> Namely, do you initiate a commit during a call to onPartitionsRevoked?
>>
>> -Joey
>>
>> On Tue, Jul 26, 2016 at 5:51 PM, craig w <co...@gmail.com> wrote:
>> > We had to have one thread use the Consumer to poll and get records, it
>> > would then put them into a blocking queue (in memory), pause our
>> > subscription, have separate threads pull work from the blocking queue.
>> > meanwhile the thread with the consumer would keep calling "poll" (getting
>> > no data back b/c we paused our subscriptions). Once the blocking queue is
>> > empty, the subscriptions would be resumed and "poll" get fetch data
>> again.
>> >
>> > We then had to have an object that the worker threads could use to
>> "commit"
>> > their offsets as they completed a task. The consumer thread would check
>> to
>> > see if there were offsets that needed to committed, if so it would commit
>> > them.
>> >
>> > It was a bunch of work to get it working and it's still not perfect, but
>> > it's getting the job done.
>> >
>> > -craig
>> >
>> > On Tue, Jul 26, 2016 at 3:13 PM, Joey Echeverria <jo...@rocana.com>
>> wrote:
>> >
>> >> We've been playing around with the new Consumer API and have it an
>> >> unfortunate bump in the road. When our onPartitionsRevoked() callback
>> >> is called we'd like to be able to commit any data that we were
>> >> processing to stable storage so we can then commit the offsets back to
>> >> Kafka. This way we don't throw away in progress work. The problem is
>> >> that onPartitionsRevoked() is called from the same thread running
>> >> poll() which means that heartbeats are paused while the callback is
>> >> processing. Our commits sometimes take a few minutes which means that
>> >> we'll lose our Kafka session and the partitions we were trying to
>> >> commit will get reassigned. The end result of that is more duplicates
>> >> in our stored data.
>> >>
>> >> Has anyone else encountered this? We're probably going to just do a
>> >> rollback rather than a commit in the callback to return quickly, but I
>> >> wanted to check to see if there was something we were missing.
>> >>
>> >> -Joey
>> >>
>> >
>> >
>> >
>> > --
>> >
>> > https://github.com/mindscratch
>> > https://www.google.com/+CraigWickesser
>> > https://twitter.com/mind_scratch
>> > https://twitter.com/craig_links
>>
>>
>>
>> --
>> -Joey
>>
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links



-- 
-Joey

Re: Handling long commits during a rebalance

Posted by craig w <co...@gmail.com>.
In my case, no....when a rebalance occurs the work being performed can't be
"paused" and picked up again later, it has to be started again, so when
onPartitionsRevoked occurs the blocking queue is cleared...again may not
ideal, but works for this use case.

On Tue, Jul 26, 2016 at 8:53 PM, Joey Echeverria <jo...@rocana.com> wrote:

> That's the direction we're looking at for normal commit processing,
> but how do you handle commits during a rebalance?
>
> Namely, do you initiate a commit during a call to onPartitionsRevoked?
>
> -Joey
>
> On Tue, Jul 26, 2016 at 5:51 PM, craig w <co...@gmail.com> wrote:
> > We had to have one thread use the Consumer to poll and get records, it
> > would then put them into a blocking queue (in memory), pause our
> > subscription, have separate threads pull work from the blocking queue.
> > meanwhile the thread with the consumer would keep calling "poll" (getting
> > no data back b/c we paused our subscriptions). Once the blocking queue is
> > empty, the subscriptions would be resumed and "poll" get fetch data
> again.
> >
> > We then had to have an object that the worker threads could use to
> "commit"
> > their offsets as they completed a task. The consumer thread would check
> to
> > see if there were offsets that needed to committed, if so it would commit
> > them.
> >
> > It was a bunch of work to get it working and it's still not perfect, but
> > it's getting the job done.
> >
> > -craig
> >
> > On Tue, Jul 26, 2016 at 3:13 PM, Joey Echeverria <jo...@rocana.com>
> wrote:
> >
> >> We've been playing around with the new Consumer API and have it an
> >> unfortunate bump in the road. When our onPartitionsRevoked() callback
> >> is called we'd like to be able to commit any data that we were
> >> processing to stable storage so we can then commit the offsets back to
> >> Kafka. This way we don't throw away in progress work. The problem is
> >> that onPartitionsRevoked() is called from the same thread running
> >> poll() which means that heartbeats are paused while the callback is
> >> processing. Our commits sometimes take a few minutes which means that
> >> we'll lose our Kafka session and the partitions we were trying to
> >> commit will get reassigned. The end result of that is more duplicates
> >> in our stored data.
> >>
> >> Has anyone else encountered this? We're probably going to just do a
> >> rollback rather than a commit in the callback to return quickly, but I
> >> wanted to check to see if there was something we were missing.
> >>
> >> -Joey
> >>
> >
> >
> >
> > --
> >
> > https://github.com/mindscratch
> > https://www.google.com/+CraigWickesser
> > https://twitter.com/mind_scratch
> > https://twitter.com/craig_links
>
>
>
> --
> -Joey
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Re: Handling long commits during a rebalance

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Joey,

You'll probably want to look into
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
That should address the long, minutes-long timeout you're referring to with
onPartitionsRevoked(). If you need to address it in the meantime, you'll
want to increase your session timeout to cover the amount of time it takes
to commit/rebalance.

-Ewen

On Tue, Jul 26, 2016 at 5:53 PM, Joey Echeverria <jo...@rocana.com> wrote:

> That's the direction we're looking at for normal commit processing,
> but how do you handle commits during a rebalance?
>
> Namely, do you initiate a commit during a call to onPartitionsRevoked?
>
> -Joey
>
> On Tue, Jul 26, 2016 at 5:51 PM, craig w <co...@gmail.com> wrote:
> > We had to have one thread use the Consumer to poll and get records, it
> > would then put them into a blocking queue (in memory), pause our
> > subscription, have separate threads pull work from the blocking queue.
> > meanwhile the thread with the consumer would keep calling "poll" (getting
> > no data back b/c we paused our subscriptions). Once the blocking queue is
> > empty, the subscriptions would be resumed and "poll" get fetch data
> again.
> >
> > We then had to have an object that the worker threads could use to
> "commit"
> > their offsets as they completed a task. The consumer thread would check
> to
> > see if there were offsets that needed to committed, if so it would commit
> > them.
> >
> > It was a bunch of work to get it working and it's still not perfect, but
> > it's getting the job done.
> >
> > -craig
> >
> > On Tue, Jul 26, 2016 at 3:13 PM, Joey Echeverria <jo...@rocana.com>
> wrote:
> >
> >> We've been playing around with the new Consumer API and have it an
> >> unfortunate bump in the road. When our onPartitionsRevoked() callback
> >> is called we'd like to be able to commit any data that we were
> >> processing to stable storage so we can then commit the offsets back to
> >> Kafka. This way we don't throw away in progress work. The problem is
> >> that onPartitionsRevoked() is called from the same thread running
> >> poll() which means that heartbeats are paused while the callback is
> >> processing. Our commits sometimes take a few minutes which means that
> >> we'll lose our Kafka session and the partitions we were trying to
> >> commit will get reassigned. The end result of that is more duplicates
> >> in our stored data.
> >>
> >> Has anyone else encountered this? We're probably going to just do a
> >> rollback rather than a commit in the callback to return quickly, but I
> >> wanted to check to see if there was something we were missing.
> >>
> >> -Joey
> >>
> >
> >
> >
> > --
> >
> > https://github.com/mindscratch
> > https://www.google.com/+CraigWickesser
> > https://twitter.com/mind_scratch
> > https://twitter.com/craig_links
>
>
>
> --
> -Joey
>



-- 
Thanks,
Ewen

Re: Handling long commits during a rebalance

Posted by Joey Echeverria <jo...@rocana.com>.
That's the direction we're looking at for normal commit processing,
but how do you handle commits during a rebalance?

Namely, do you initiate a commit during a call to onPartitionsRevoked?

-Joey

On Tue, Jul 26, 2016 at 5:51 PM, craig w <co...@gmail.com> wrote:
> We had to have one thread use the Consumer to poll and get records, it
> would then put them into a blocking queue (in memory), pause our
> subscription, have separate threads pull work from the blocking queue.
> meanwhile the thread with the consumer would keep calling "poll" (getting
> no data back b/c we paused our subscriptions). Once the blocking queue is
> empty, the subscriptions would be resumed and "poll" get fetch data again.
>
> We then had to have an object that the worker threads could use to "commit"
> their offsets as they completed a task. The consumer thread would check to
> see if there were offsets that needed to committed, if so it would commit
> them.
>
> It was a bunch of work to get it working and it's still not perfect, but
> it's getting the job done.
>
> -craig
>
> On Tue, Jul 26, 2016 at 3:13 PM, Joey Echeverria <jo...@rocana.com> wrote:
>
>> We've been playing around with the new Consumer API and have it an
>> unfortunate bump in the road. When our onPartitionsRevoked() callback
>> is called we'd like to be able to commit any data that we were
>> processing to stable storage so we can then commit the offsets back to
>> Kafka. This way we don't throw away in progress work. The problem is
>> that onPartitionsRevoked() is called from the same thread running
>> poll() which means that heartbeats are paused while the callback is
>> processing. Our commits sometimes take a few minutes which means that
>> we'll lose our Kafka session and the partitions we were trying to
>> commit will get reassigned. The end result of that is more duplicates
>> in our stored data.
>>
>> Has anyone else encountered this? We're probably going to just do a
>> rollback rather than a commit in the callback to return quickly, but I
>> wanted to check to see if there was something we were missing.
>>
>> -Joey
>>
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links



-- 
-Joey

Re: Handling long commits during a rebalance

Posted by craig w <co...@gmail.com>.
We had to have one thread use the Consumer to poll and get records, it
would then put them into a blocking queue (in memory), pause our
subscription, have separate threads pull work from the blocking queue.
meanwhile the thread with the consumer would keep calling "poll" (getting
no data back b/c we paused our subscriptions). Once the blocking queue is
empty, the subscriptions would be resumed and "poll" get fetch data again.

We then had to have an object that the worker threads could use to "commit"
their offsets as they completed a task. The consumer thread would check to
see if there were offsets that needed to committed, if so it would commit
them.

It was a bunch of work to get it working and it's still not perfect, but
it's getting the job done.

-craig

On Tue, Jul 26, 2016 at 3:13 PM, Joey Echeverria <jo...@rocana.com> wrote:

> We've been playing around with the new Consumer API and have it an
> unfortunate bump in the road. When our onPartitionsRevoked() callback
> is called we'd like to be able to commit any data that we were
> processing to stable storage so we can then commit the offsets back to
> Kafka. This way we don't throw away in progress work. The problem is
> that onPartitionsRevoked() is called from the same thread running
> poll() which means that heartbeats are paused while the callback is
> processing. Our commits sometimes take a few minutes which means that
> we'll lose our Kafka session and the partitions we were trying to
> commit will get reassigned. The end result of that is more duplicates
> in our stored data.
>
> Has anyone else encountered this? We're probably going to just do a
> rollback rather than a commit in the callback to return quickly, but I
> wanted to check to see if there was something we were missing.
>
> -Joey
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links