You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Насыров Ренат <re...@yandex.ru> on 2016/02/16 16:15:41 UTC

Rebalancing during the long-running tasks

Hello!

I'm trying to use kafka for long-running tasks processing. The tasks can be very short (less than a second) or very long (about 10 minutes). I've got one consumer group for the single queue, and one or more consumers. Sometimes consumers manage to commit their offsets before rebalancing, sometimes not (and fail). Accordning to this document ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design ), in worst case (when all the consumers are on very long tasks) it goes as follows:

1) Consumers get long tasks from the queue.
2) Consumers performing their long-running tasks.
3) Session timeout happens.
4) Group coordinator performs a rebalance; the current generation number is increased.
5) Consumers complete their long-running tasks and commit.
6) GroupCoordinator returns IllegalGeneration errors to consumers and does not allow to commit the offsets.
7) Consumers reconnect and get the very messages from the previous generation, thus stucking in forever loop.

Suggestions:

1) Commit first, then process. Inacceptable in my case because it leads to at-most-once semantics.
2) Increase session timeout limit. Not desired because task duration can negatively affect the effectiveness of rebalance.

Is there any proper way to complete long-running tasks?

Re: Rebalancing during the long-running tasks

Posted by Damian Guy <da...@gmail.com>.
Hi,

I had the same issue and managed to work around it by simulating a
heartbeat to kafka. It works really well, i.e., we have had zero issues
since it was implemented

I have somthing like this:


void process() {
   records = consumer.poll(timeout)
   dispatcher.dispatch(records)
   while(!dispatcher.isDone(heartbeatInterval, TimeUnit.MILLISECONDS) {
     heartbeat()
   }
}

void heartbeat() {
  consumer.pause(getCurrentAssignment())
  consumer.poll(10)
  consumer.resume(getCurrentAssignment())
}

TopicPartition[] getCurrentAssignment() {
   return consumer.assignment().toArray(new TopicPartition[0])
}


On 16 February 2016 at 16:20, Ben Stopford <be...@confluent.io> wrote:

> I think you’ll find some useful context in this KIP Jason wrote. It’s
> pretty good.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41:+KafkaConsumer+Max+Records
> >
>
>
> > On 16 Feb 2016, at 07:15, Насыров Ренат <re...@yandex.ru> wrote:
> >
> > Hello!
> >
> > I'm trying to use kafka for long-running tasks processing. The tasks can
> be very short (less than a second) or very long (about 10 minutes). I've
> got one consumer group for the single queue, and one or more consumers.
> Sometimes consumers manage to commit their offsets before rebalancing,
> sometimes not (and fail). Accordning to this document (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> ), in worst case (when all the consumers are on very long tasks) it goes as
> follows:
> >
> > 1) Consumers get long tasks from the queue.
> > 2) Consumers performing their long-running tasks.
> > 3) Session timeout happens.
> > 4) Group coordinator performs a rebalance; the current generation number
> is increased.
> > 5) Consumers complete their long-running tasks and commit.
> > 6) GroupCoordinator returns IllegalGeneration errors to consumers and
> does not allow to commit the offsets.
> > 7) Consumers reconnect and get the very messages from the previous
> generation, thus stucking in forever loop.
> >
> > Suggestions:
> >
> > 1) Commit first, then process. Inacceptable in my case because it leads
> to at-most-once semantics.
> > 2) Increase session timeout limit. Not desired because task duration can
> negatively affect the effectiveness of rebalance.
> >
> > Is there any proper way to complete long-running tasks?
>
>

Re: Rebalancing during the long-running tasks

Posted by Насыров Ренат <re...@yandex.ru>.
Great proposal, indeed. If I understand right, Jason suggests committing messages one-by-one instead of ingesting the whole batch. I have nothing against processing the big bunch of tasks from poll() and committing after every single processed task, but in my case the very single task is too heavy. I'm afraid I cannot benefit from this proposal.

16.02.2016, 19:20, "Ben Stopford" <be...@confluent.io>:
> I think you’ll find some useful context in this KIP Jason wrote. It’s pretty good.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records <https://cwiki.apache.org/confluence/display/KAFKA/KIP-41:+KafkaConsumer+Max+Records>
>
>>  On 16 Feb 2016, at 07:15, Насыров Ренат <re...@yandex.ru> wrote:
>>
>>  Hello!
>>
>>  I'm trying to use kafka for long-running tasks processing. The tasks can be very short (less than a second) or very long (about 10 minutes). I've got one consumer group for the single queue, and one or more consumers. Sometimes consumers manage to commit their offsets before rebalancing, sometimes not (and fail). Accordning to this document ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design ), in worst case (when all the consumers are on very long tasks) it goes as follows:
>>
>>  1) Consumers get long tasks from the queue.
>>  2) Consumers performing their long-running tasks.
>>  3) Session timeout happens.
>>  4) Group coordinator performs a rebalance; the current generation number is increased.
>>  5) Consumers complete their long-running tasks and commit.
>>  6) GroupCoordinator returns IllegalGeneration errors to consumers and does not allow to commit the offsets.
>>  7) Consumers reconnect and get the very messages from the previous generation, thus stucking in forever loop.
>>
>>  Suggestions:
>>
>>  1) Commit first, then process. Inacceptable in my case because it leads to at-most-once semantics.
>>  2) Increase session timeout limit. Not desired because task duration can negatively affect the effectiveness of rebalance.
>>
>>  Is there any proper way to complete long-running tasks?

-- 
С уважением,
Ренат Насыров

Re: Rebalancing during the long-running tasks

Posted by Ben Stopford <be...@confluent.io>.
I think you’ll find some useful context in this KIP Jason wrote. It’s pretty good. 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records <https://cwiki.apache.org/confluence/display/KAFKA/KIP-41:+KafkaConsumer+Max+Records>


> On 16 Feb 2016, at 07:15, Насыров Ренат <re...@yandex.ru> wrote:
> 
> Hello!
> 
> I'm trying to use kafka for long-running tasks processing. The tasks can be very short (less than a second) or very long (about 10 minutes). I've got one consumer group for the single queue, and one or more consumers. Sometimes consumers manage to commit their offsets before rebalancing, sometimes not (and fail). Accordning to this document ( https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design ), in worst case (when all the consumers are on very long tasks) it goes as follows:
> 
> 1) Consumers get long tasks from the queue.
> 2) Consumers performing their long-running tasks.
> 3) Session timeout happens.
> 4) Group coordinator performs a rebalance; the current generation number is increased.
> 5) Consumers complete their long-running tasks and commit.
> 6) GroupCoordinator returns IllegalGeneration errors to consumers and does not allow to commit the offsets.
> 7) Consumers reconnect and get the very messages from the previous generation, thus stucking in forever loop.
> 
> Suggestions:
> 
> 1) Commit first, then process. Inacceptable in my case because it leads to at-most-once semantics.
> 2) Increase session timeout limit. Not desired because task duration can negatively affect the effectiveness of rebalance.
> 
> Is there any proper way to complete long-running tasks?