You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Guven Demir <Gu...@sahibinden.com> on 2016/02/25 17:53:51 UTC

new consumer api / heartbeat, manual commit & long to process messages

hi all,

i'm having trouble processing a topic which includes paths to images which need to be downloaded and saved to disk (each takes ~3-5 seconds) and several are received on each poll

within this scenario, i'm receiving the following error:

    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance

which i assume is due to heartbeat failure and broker re-assigning the consumer's partition to another consumer

are there any recommendations for processing long to process messages?

thanks in advance,
guven



Re: new consumer api / heartbeat, manual commit & long to process messages

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Guven,

A heartbeat API actually came up in the discussion of KIP-41. Ultimately we
rejected it because it led to confusing API semantics. The problem is that
heartbeat responses are used by the coordinator to tell consumers when a
rebalance is needed. But what should the user do if they call heartbeat()
and find that the group is rebalancing? If they don't stop message
processing and rejoin, then they may be kicked out of the group just as if
they had failed to heartbeat before expiration of the session timeout.
Alternatively, if we made heartbeat() blocking and let the rebalance
complete in the call itself, then the consumer may no longer be assigned
the same partitions. So either way, unless you can preempt message
processing, you may fall out of the group and pending messages will need to
be reprocessed after the rebalance completes. And if you can preempt
message processing, then you can ensure that heartbeats get sent by always
preempting the processor before the session timeout expires.

In the end, we felt that max.poll.records was a simpler option since it
gives you fine control over the poll loop and doesn't require any confusing
API changes . As long as you can put some upper bound on the processing
time, you can set max.poll.records=1 and the session timeout to whatever
the upper bound is.

However, if you have a use case where there is a very high variance in
message processing times, it may not be so helpful. In that case, the best
options I can think of at the moment are the following:

1. Move the processing to another thread. Basically the workflow would be
something like this: 1) receive records for a partition in poll(), 2)
submit them to an executor for processing, 3) pause the partition, and 4)
continue the poll loop. When the processor finishes with the records, you
can use resume() to reenable fetching. You'll have to manage offset commits
yourself since you wouldn't want to commit before the thread has actually
finished processing. You'll also have to account for the possibility of a
rebalance completing while the thread is still processing a batch (an easy
way to do this would probably be to just ignore CommitFailedException
thrown from commit).

2. This is a tad hacky, but you could take advantage of the fact that the
coordinator treats commits as heartbeats and call commitSync() periodically
while handling a batch of records. Note in this case that you should not
use the no-arg commitSync() variant which will commit the offsets for the
full batch returned from the last poll(). Instead you should pass the
offsets of the records already processed explicitly in
commitSync(Map<TopicPartition, OffsetAndMetadata>).

3. Use the consumer in "simple" mode. If you don't actually need group
coordination, then you can assign the partitions you want to read from
manually and consume them at your own rate. There is no heartbeating or
rebalancing to worry about.

-Jason

On Fri, Feb 26, 2016 at 1:20 AM, Guven Demir <Gu...@sahibinden.com>
wrote:

> thanks for the response Jason,
>
> i've already experimented with a similar solution myself, lowering
> max.partition.fetch.bytes to barely fit the largest message (2k at the
> moment)
>
> still, i've observed similar problems, which is caused by really long
> processing times, e.g. downloading a large video via a link received in the
> message
>
> it's not very feasible to increase the heartbeat timeout too much, as
> session timeout is recommened to be at least 3 times that of heartbeat
> timeout. and that is bounded by broker's group.max.session.timeout.ms,
> which i would not want to increase as it would affect all other
> topics/consumers
>
> could there be an api for triggering the heartbeat manually maybe? it can
> be argued that that would beat the purpose of a heartbeat though, it might
> be used improperly, i.e. in my case rather than sending heartbeats inside
> the download/save loop but in an empty loop waiting for the download to
> complete, which might never happen. again, sending heartbeats in
> application code might be considered tight coupling as well
>
> other than that, i will experiment with the pause() api, separate thread
> for the actual message processing and poll()'ing with all partitions paused
>
> guven
>
>
> > On 25 Feb 2016, at 20:19, Jason Gustafson <ja...@confluent.io> wrote:
> >
> > Hey Guven,
> >
> > This problem is what KIP-41 was created for:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
> > .
> >
> > The patch for this was committed yesterday and will be included in 0.10.
> If
> > you need something in the shorter term, you could probably use the client
> > from trunk (no changes to the server are needed).
> >
> > If this is still not sufficient, I recommend looking into the pause()
> API,
> > which can facilitate asynchronous message processing in another thread.
> >
> > -Jason
> >
> > On Thu, Feb 25, 2016 at 8:53 AM, Guven Demir <Guven.Demir@sahibinden.com
> >
> > wrote:
> >
> >> hi all,
> >>
> >> i'm having trouble processing a topic which includes paths to images
> which
> >> need to be downloaded and saved to disk (each takes ~3-5 seconds) and
> >> several are received on each poll
> >>
> >> within this scenario, i'm receiving the following error:
> >>
> >>    org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot
> >> be completed due to group rebalance
> >>
> >> which i assume is due to heartbeat failure and broker re-assigning the
> >> consumer's partition to another consumer
> >>
> >> are there any recommendations for processing long to process messages?
> >>
> >> thanks in advance,
> >> guven
> >>
> >>
> >>
>
>

Re: new consumer api / heartbeat, manual commit & long to process messages

Posted by Guven Demir <Gu...@sahibinden.com>.
thanks for the response Jason,

i've already experimented with a similar solution myself, lowering max.partition.fetch.bytes to barely fit the largest message (2k at the moment)

still, i've observed similar problems, which is caused by really long processing times, e.g. downloading a large video via a link received in the message

it's not very feasible to increase the heartbeat timeout too much, as session timeout is recommened to be at least 3 times that of heartbeat timeout. and that is bounded by broker's group.max.session.timeout.ms, which i would not want to increase as it would affect all other topics/consumers

could there be an api for triggering the heartbeat manually maybe? it can be argued that that would beat the purpose of a heartbeat though, it might be used improperly, i.e. in my case rather than sending heartbeats inside the download/save loop but in an empty loop waiting for the download to complete, which might never happen. again, sending heartbeats in application code might be considered tight coupling as well

other than that, i will experiment with the pause() api, separate thread for the actual message processing and poll()'ing with all partitions paused

guven


> On 25 Feb 2016, at 20:19, Jason Gustafson <ja...@confluent.io> wrote:
> 
> Hey Guven,
> 
> This problem is what KIP-41 was created for:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
> .
> 
> The patch for this was committed yesterday and will be included in 0.10. If
> you need something in the shorter term, you could probably use the client
> from trunk (no changes to the server are needed).
> 
> If this is still not sufficient, I recommend looking into the pause() API,
> which can facilitate asynchronous message processing in another thread.
> 
> -Jason
> 
> On Thu, Feb 25, 2016 at 8:53 AM, Guven Demir <Gu...@sahibinden.com>
> wrote:
> 
>> hi all,
>> 
>> i'm having trouble processing a topic which includes paths to images which
>> need to be downloaded and saved to disk (each takes ~3-5 seconds) and
>> several are received on each poll
>> 
>> within this scenario, i'm receiving the following error:
>> 
>>    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed due to group rebalance
>> 
>> which i assume is due to heartbeat failure and broker re-assigning the
>> consumer's partition to another consumer
>> 
>> are there any recommendations for processing long to process messages?
>> 
>> thanks in advance,
>> guven
>> 
>> 
>> 


Re: new consumer api / heartbeat, manual commit & long to process messages

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Guven,

This problem is what KIP-41 was created for:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records
.

The patch for this was committed yesterday and will be included in 0.10. If
you need something in the shorter term, you could probably use the client
from trunk (no changes to the server are needed).

If this is still not sufficient, I recommend looking into the pause() API,
which can facilitate asynchronous message processing in another thread.

-Jason

On Thu, Feb 25, 2016 at 8:53 AM, Guven Demir <Gu...@sahibinden.com>
wrote:

> hi all,
>
> i'm having trouble processing a topic which includes paths to images which
> need to be downloaded and saved to disk (each takes ~3-5 seconds) and
> several are received on each poll
>
> within this scenario, i'm receiving the following error:
>
>     org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
> be completed due to group rebalance
>
> which i assume is due to heartbeat failure and broker re-assigning the
> consumer's partition to another consumer
>
> are there any recommendations for processing long to process messages?
>
> thanks in advance,
> guven
>
>
>