You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Paul Mackles <pm...@adobe.com> on 2016/11/07 17:53:13 UTC

consumer client pause/resume/rebalance

Using the  v0.9.0.1 consumer API, I recently learned that paused partitions can unexpectedly become become unpaused during a rebalance. I also found an old thread from the mailing list which corroborates this behavior:


http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour


<http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While I can maintain the partition state myself, it seems like it would be a lot easier if this were either handled internally by the consumer API (i.e. pause the partitions that were previously paused before resuming) and/or make the partition state available to the RebalanceListener.


I did not find any existing tickets in JIRA related to this so I am wondering if this is a valid bug/enhancement or if someone found a decent workaround. All of the consumer API examples that I have found do not appear to handle this scenario.


Here is the code snippet from he client I have been working on:


consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));

while (!isWritable()) {
  // WARNING: if there is a rebalance, this call may return some records!!!
  consumer.poll(0);
  Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
}

consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));


Thanks,

Paul




Re: consumer client pause/resume/rebalance

Posted by Gwen Shapira <gw...@confluent.io>.
Yeah, we should mention that in the javadoc. Want to send a PR?

I like the "worse is better" philosophy - although we occasionally
choose complex implementation for simpler APIs (especially when it
comes to admin configurations).

On Tue, Nov 8, 2016 at 2:34 AM, Paul Mackles <pm...@adobe.com> wrote:
> Hi Gwen - Makes sense. The way you explain it actually reminds me a little of the "worse is better" philosophy: https://www.jwz.org/doc/worse-is-better.html
>
>
> Perhaps a mention in the javadoc for pause() and/or ConsumerRebalanceListener would be sufficient.
>
> ________________________________
> From: Gwen Shapira <gw...@confluent.io>
> Sent: Monday, November 07, 2016 3:34:39 PM
> To: Users
> Subject: Re: consumer client pause/resume/rebalance
>
> I think the current behavior is fairly reasonable. Following a
> rebalance the entire state of the consumer changes - you may get an
> entirely new set of partitions. A common use-case for pause is to
> allow a consumer to keep polling and avoid getting new events while it
> is retrying to process existing events - well, following a rebalance,
> it is possible that another consumer owns the partition, is already
> re-processing these events and the entire state needs to be reset.
>
> I usually recommend developers to treat rebalance as a restart (since
> you are getting a whole new set of partitions) and just follow
> whatever process you'd follow to set up after a restart. Since pauses
> don't survive restarts, I wouldn't expect them to survive a rebalance
> either.
>
> I hope this helps explain the behavior?
>
> On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pm...@adobe.com> wrote:
>> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions can unexpectedly become become unpaused during a rebalance. I also found an old thread from the mailing list which corroborates this behavior:
>>
>>
>> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>>
>>
>> <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While I can maintain the partition state myself, it seems like it would be a lot easier if this were either handled internally by the consumer API (i.e. pause the partitions that were previously paused before resuming) and/or make the partition state available to the RebalanceListener.
>>
>>
>> I did not find any existing tickets in JIRA related to this so I am wondering if this is a valid bug/enhancement or if someone found a decent workaround. All of the consumer API examples that I have found do not appear to handle this scenario.
>>
>>
>> Here is the code snippet from he client I have been working on:
>>
>>
>> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>>
>> while (!isWritable()) {
>>   // WARNING: if there is a rebalance, this call may return some records!!!
>>   consumer.poll(0);
>>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
>> }
>>
>> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>>
>>
>> Thanks,
>>
>> Paul
>>
>>
>>
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Re: consumer client pause/resume/rebalance

Posted by Paul Mackles <pm...@adobe.com>.
Hi Gwen - Makes sense. The way you explain it actually reminds me a little of the "worse is better" philosophy: https://www.jwz.org/doc/worse-is-better.html


Perhaps a mention in the javadoc for pause() and/or ConsumerRebalanceListener would be sufficient.

________________________________
From: Gwen Shapira <gw...@confluent.io>
Sent: Monday, November 07, 2016 3:34:39 PM
To: Users
Subject: Re: consumer client pause/resume/rebalance

I think the current behavior is fairly reasonable. Following a
rebalance the entire state of the consumer changes - you may get an
entirely new set of partitions. A common use-case for pause is to
allow a consumer to keep polling and avoid getting new events while it
is retrying to process existing events - well, following a rebalance,
it is possible that another consumer owns the partition, is already
re-processing these events and the entire state needs to be reset.

I usually recommend developers to treat rebalance as a restart (since
you are getting a whole new set of partitions) and just follow
whatever process you'd follow to set up after a restart. Since pauses
don't survive restarts, I wouldn't expect them to survive a rebalance
either.

I hope this helps explain the behavior?

On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pm...@adobe.com> wrote:
> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions can unexpectedly become become unpaused during a rebalance. I also found an old thread from the mailing list which corroborates this behavior:
>
>
> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>
>
> <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While I can maintain the partition state myself, it seems like it would be a lot easier if this were either handled internally by the consumer API (i.e. pause the partitions that were previously paused before resuming) and/or make the partition state available to the RebalanceListener.
>
>
> I did not find any existing tickets in JIRA related to this so I am wondering if this is a valid bug/enhancement or if someone found a decent workaround. All of the consumer API examples that I have found do not appear to handle this scenario.
>
>
> Here is the code snippet from he client I have been working on:
>
>
> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>
> while (!isWritable()) {
>   // WARNING: if there is a rebalance, this call may return some records!!!
>   consumer.poll(0);
>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
> }
>
> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>
>
> Thanks,
>
> Paul
>
>
>



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Re: consumer client pause/resume/rebalance

Posted by Gwen Shapira <gw...@confluent.io>.
I think the current behavior is fairly reasonable. Following a
rebalance the entire state of the consumer changes - you may get an
entirely new set of partitions. A common use-case for pause is to
allow a consumer to keep polling and avoid getting new events while it
is retrying to process existing events - well, following a rebalance,
it is possible that another consumer owns the partition, is already
re-processing these events and the entire state needs to be reset.

I usually recommend developers to treat rebalance as a restart (since
you are getting a whole new set of partitions) and just follow
whatever process you'd follow to set up after a restart. Since pauses
don't survive restarts, I wouldn't expect them to survive a rebalance
either.

I hope this helps explain the behavior?

On Mon, Nov 7, 2016 at 9:53 AM, Paul Mackles <pm...@adobe.com> wrote:
> Using the  v0.9.0.1 consumer API, I recently learned that paused partitions can unexpectedly become become unpaused during a rebalance. I also found an old thread from the mailing list which corroborates this behavior:
>
>
> http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour
>
>
> <http://grokbase.com/t/kafka/users/161wgzckze/new-consumer-pause-reset-behaviour>While I can maintain the partition state myself, it seems like it would be a lot easier if this were either handled internally by the consumer API (i.e. pause the partitions that were previously paused before resuming) and/or make the partition state available to the RebalanceListener.
>
>
> I did not find any existing tickets in JIRA related to this so I am wondering if this is a valid bug/enhancement or if someone found a decent workaround. All of the consumer API examples that I have found do not appear to handle this scenario.
>
>
> Here is the code snippet from he client I have been working on:
>
>
> consumer.pause(consumer.assignment().toArray(EMPTYTPARRAY));
>
> while (!isWritable()) {
>   // WARNING: if there is a rebalance, this call may return some records!!!
>   consumer.poll(0);
>   Uninterruptibles.sleepUninterruptibly(pauseWait, TimeUnit.MILLISECONDS);
> }
>
> consumer.resume(consumer.assignment().toArray(EMPTYTPARRAY));
>
>
> Thanks,
>
> Paul
>
>
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog