You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Daniel Fanjul <da...@gmail.com> on 2016/03/31 12:13:28 UTC

consumer too fast

Hi all,

My problem: If the consumer fetches too much data and the processing of the
records is not fast enough, commit() fails because there was a rebalance.

I cannot reduce 'max.partition.fetch.bytes' because there might be large
messages.

I don't want to increase the 'session.timeout.ms', because it would be too
large to detect failures.

I understand that the new consumer API only sends the heartbeats and
manages rebalances during the call to poll(). But if I call poll(0), there
is still a chance it will return even more data. So I keep the heart beats,
but I may accumulate too much data, eventually leading to OOM.

I would like something:
foreach record in consumer.poll() {
  process(record)
  consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
}

Is this possible?

Re: consumer too fast

Posted by Martin Skøtt <ma...@falconsocial.com>.
Keyboard error...

something along these lines:

records = consumer.poll()
foreach record:
  process record
  add to commit map
  if records processed > threshold:
    commit map

Take care to make sure everything has been committed before calling poll
again because it would cause the driver to skip over the uncommitted
records from the previous poll.

Martin


On 31 March 2016 at 16:10, Martin Skøtt <ma...@falconsocial.com>
wrote:

> We have recently had great success with committing records in smaller
> batches between poll()'s. Something along these lines:
>
> records = consumer.poll()
> foreach record:
> process record
>
>
>
>
>
> On 31 March 2016 at 12:13, Daniel Fanjul <da...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> My problem: If the consumer fetches too much data and the processing of
>> the
>> records is not fast enough, commit() fails because there was a rebalance.
>>
>> I cannot reduce 'max.partition.fetch.bytes' because there might be large
>> messages.
>>
>> I don't want to increase the 'session.timeout.ms', because it would be
>> too
>> large to detect failures.
>>
>> I understand that the new consumer API only sends the heartbeats and
>> manages rebalances during the call to poll(). But if I call poll(0), there
>> is still a chance it will return even more data. So I keep the heart
>> beats,
>> but I may accumulate too much data, eventually leading to OOM.
>>
>> I would like something:
>> foreach record in consumer.poll() {
>>   process(record)
>>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
>> }
>>
>> Is this possible?
>>
>
>

Re: consumer too fast

Posted by Martin Skøtt <ma...@falconsocial.com>.
We have recently had great success with committing records in smaller
batches between poll()'s. Something along these lines:

records = consumer.poll()
foreach record:
process record





On 31 March 2016 at 12:13, Daniel Fanjul <da...@gmail.com>
wrote:

> Hi all,
>
> My problem: If the consumer fetches too much data and the processing of the
> records is not fast enough, commit() fails because there was a rebalance.
>
> I cannot reduce 'max.partition.fetch.bytes' because there might be large
> messages.
>
> I don't want to increase the 'session.timeout.ms', because it would be too
> large to detect failures.
>
> I understand that the new consumer API only sends the heartbeats and
> manages rebalances during the call to poll(). But if I call poll(0), there
> is still a chance it will return even more data. So I keep the heart beats,
> but I may accumulate too much data, eventually leading to OOM.
>
> I would like something:
> foreach record in consumer.poll() {
>   process(record)
>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> }
>
> Is this possible?
>

Re: consumer too fast

Posted by Daniel Fanjul <da...@gmail.com>.
Ok, thank you very much. To pause all assigned partitions should work for
us, I will try it.

On Thu, Mar 31, 2016 at 12:32 PM, Manikumar Reddy <manikumar.reddy@gmail.com
> wrote:

> Hi,
>
> 1. New config property "max.poll.records"  is getting  introduced in
> upcoming 0.10 release.
>    This property can be used to control the no. of records in each poll.
>
> 2.  We can use the combination of ExecutorService/Processing Thread and
> Pause/Resume API to handle unwanted rebalances.
>
> Some of these options are discussed here
>
> http://users.kafka.apache.narkive.com/4vvhuBZO/low-latency-high-message-size-variance
>
> Example code is here
>
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
>
> On Thu, Mar 31, 2016 at 3:43 PM, Daniel Fanjul <
> daniel.fanjul.alcuten@gmail.com> wrote:
>
> > Hi all,
> >
> > My problem: If the consumer fetches too much data and the processing of
> the
> > records is not fast enough, commit() fails because there was a rebalance.
> >
> > I cannot reduce 'max.partition.fetch.bytes' because there might be large
> > messages.
> >
> > I don't want to increase the 'session.timeout.ms', because it would be
> too
> > large to detect failures.
> >
> > I understand that the new consumer API only sends the heartbeats and
> > manages rebalances during the call to poll(). But if I call poll(0),
> there
> > is still a chance it will return even more data. So I keep the heart
> beats,
> > but I may accumulate too much data, eventually leading to OOM.
> >
> > I would like something:
> > foreach record in consumer.poll() {
> >   process(record)
> >   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> > }
> >
> > Is this possible?
> >
>

Re: consumer too fast

Posted by Manikumar Reddy <ma...@gmail.com>.
Hi,

1. New config property "max.poll.records"  is getting  introduced in
upcoming 0.10 release.
   This property can be used to control the no. of records in each poll.

2.  We can use the combination of ExecutorService/Processing Thread and
Pause/Resume API to handle unwanted rebalances.

Some of these options are discussed here
http://users.kafka.apache.narkive.com/4vvhuBZO/low-latency-high-message-size-variance

Example code is here
https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java

On Thu, Mar 31, 2016 at 3:43 PM, Daniel Fanjul <
daniel.fanjul.alcuten@gmail.com> wrote:

> Hi all,
>
> My problem: If the consumer fetches too much data and the processing of the
> records is not fast enough, commit() fails because there was a rebalance.
>
> I cannot reduce 'max.partition.fetch.bytes' because there might be large
> messages.
>
> I don't want to increase the 'session.timeout.ms', because it would be too
> large to detect failures.
>
> I understand that the new consumer API only sends the heartbeats and
> manages rebalances during the call to poll(). But if I call poll(0), there
> is still a chance it will return even more data. So I keep the heart beats,
> but I may accumulate too much data, eventually leading to OOM.
>
> I would like something:
> foreach record in consumer.poll() {
>   process(record)
>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> }
>
> Is this possible?
>