You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Elias Levy <fe...@gmail.com> on 2016/07/01 01:01:56 UTC

Heartbeating during long processing times

What is the officially recommended method to heartbeat using the new Java
consumer during long message processing times?

I thought I could accomplish this by setting max.poll.records to 1 in the
client, calling consumer.pause(consumer.assignment()) when starting to
process a record, calling consumer.resume(consumer.paused()) when done
processing a record and committing its offset, and calling consumer.poll(0)
intermittently while processing the record.

The testing shows that consumer.poll(0) will return records, rather than
returning nil or an empty ConsumerRecords.

Re: Heartbeating during long processing times

Posted by Elias Levy <fe...@gmail.com>.
Shikhar,

Thanks for pointing me to KIP-62.  Once implemented, it will make workers
that take a long time processing messages a lot simpler to implement.
Until then, we have to continue using the pause/poll/resume pattern.  That
said, as fares I can tell, this pattern has not been well documented.

It appears the issue I observed is the result of consumer rebalancing. When
a consumer with paused partitions calls poll to trigger a heartbeat, the
client will process any pending consumer rebalances.  The rebalance will
potentially result in the addition of newly assigned unpaused partitions.
Worse is the fact that already assigned partitions that were paused and
that continue to be assigned to the client after the rebalance will be
become unpaused. I consider this a bug in the client.  Paused partitions
should not be unpaused during a rebalance if they continue to be assigned
to the client.  So pause/poll/resume is not sufficient for a worker that
handles messages with long processing times.  One must also implement a
ConsumerRebalanceListener that pauses all assigned partitions if the
consumer is in the middle of processing a message.



On Fri, Jul 1, 2016 at 11:52 AM, Shikhar Bhushan <sh...@confluent.io>
wrote:

> Hi Elias,
>
> KIP-62
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >
> has a discussion of current options, and the improvements that are coming.
>
> Best,
>
> Shikhar
>
> On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fe...@gmail.com>
> wrote:
>
> > What is the officially recommended method to heartbeat using the new Java
> > consumer during long message processing times?
> >
> > I thought I could accomplish this by setting max.poll.records to 1 in the
> > client, calling consumer.pause(consumer.assignment()) when starting to
> > process a record, calling consumer.resume(consumer.paused()) when done
> > processing a record and committing its offset, and calling
> consumer.poll(0)
> > intermittently while processing the record.
> >
> > The testing shows that consumer.poll(0) will return records, rather than
> > returning nil or an empty ConsumerRecords.
> >
>

Re: Heartbeating during long processing times

Posted by Shikhar Bhushan <sh...@confluent.io>.
Hi Elias,

KIP-62
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread>
has a discussion of current options, and the improvements that are coming.

Best,

Shikhar

On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fe...@gmail.com>
wrote:

> What is the officially recommended method to heartbeat using the new Java
> consumer during long message processing times?
>
> I thought I could accomplish this by setting max.poll.records to 1 in the
> client, calling consumer.pause(consumer.assignment()) when starting to
> process a record, calling consumer.resume(consumer.paused()) when done
> processing a record and committing its offset, and calling consumer.poll(0)
> intermittently while processing the record.
>
> The testing shows that consumer.poll(0) will return records, rather than
> returning nil or an empty ConsumerRecords.
>