You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yifan Ying <na...@gmail.com> on 2016/01/28 22:18:52 UTC

How to set Timeout for KafkaConsumer.poll()

Hi All,

I was using the new Kafka Consumer to fetch messages in this way:

while (true) {
    ConsumerRecords<Object, T> records = kafkaConsumer.poll(Long.MAX_VALUE);
    // do nothing if records are empty
    ....
}

Then I realized that blocking until new messages fetched might be a little
overhead. So I looked into the KafkaConsumer code to figure out get a
reasonable timeout.

do {
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
pollOnce(remaining);
    if (!records.isEmpty()) {
        // if data is available, then return it, but first send off the
        // next round of fetches to enable pipelining while the user is
        // handling the fetched records.
        fetcher.initFetches(metadata.fetch());
        client.poll(0);
        return new ConsumerRecords<>(records);
    }

    long elapsed = time.milliseconds() - start;
    remaining = timeout - elapsed;
} while (remaining > 0);

It seems that even if I set a much lower timeout, like 1000ms, my code will
still keep fetching messages, as I use while(true) and the code won't do
anything with an empty message set. So the only difference between a high
timeout and a low one is that the code is looping in the while loop I wrote
or the one in poll(). But in terms of connections to Kafka, setting a low
or high timeout won't affect much in my case.

I might misunderstand the code completely. Anyone is able to shed some
light on this topic?

Thanks.

-- 
Yifan

Re: How to set Timeout for KafkaConsumer.poll()

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Yifan,

As far as how the consumer works internally, there's not a big difference
between using a long timeout or a short timeout. Which you choose really
depends on the needs of your application. Typically people use a short
timeout in order to be able to break from the loop with a boolean flag, but
you might also do so if you have some periodic task to execute. I usually
prefer using a long timeout and breaking from the loop using the wakeup()
API.

-Jason

On Thu, Jan 28, 2016 at 1:18 PM, Yifan Ying <na...@gmail.com> wrote:

> Hi All,
>
> I was using the new Kafka Consumer to fetch messages in this way:
>
> while (true) {
>     ConsumerRecords<Object, T> records =
> kafkaConsumer.poll(Long.MAX_VALUE);
>     // do nothing if records are empty
>     ....
> }
>
> Then I realized that blocking until new messages fetched might be a little
> overhead. So I looked into the KafkaConsumer code to figure out get a
> reasonable timeout.
>
> do {
>     Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
> pollOnce(remaining);
>     if (!records.isEmpty()) {
>         // if data is available, then return it, but first send off the
>         // next round of fetches to enable pipelining while the user is
>         // handling the fetched records.
>         fetcher.initFetches(metadata.fetch());
>         client.poll(0);
>         return new ConsumerRecords<>(records);
>     }
>
>     long elapsed = time.milliseconds() - start;
>     remaining = timeout - elapsed;
> } while (remaining > 0);
>
> It seems that even if I set a much lower timeout, like 1000ms, my code will
> still keep fetching messages, as I use while(true) and the code won't do
> anything with an empty message set. So the only difference between a high
> timeout and a low one is that the code is looping in the while loop I wrote
> or the one in poll(). But in terms of connections to Kafka, setting a low
> or high timeout won't affect much in my case.
>
> I might misunderstand the code completely. Anyone is able to shed some
> light on this topic?
>
> Thanks.
>
> --
> Yifan
>

Re: How to set Timeout for KafkaConsumer.poll()

Posted by Guozhang Wang <wa...@gmail.com>.
That is right, for now.

On Thu, Jan 28, 2016 at 2:05 PM, Yifan Ying <na...@gmail.com> wrote:

> Thanks, Jason and Guozhang. I guess the conclusion is that, if there is no
> other logic to execute other than poll(), a long timeout and a short
> timeout have no big difference.
>
> Guozhang, if I understand right, even with this issue, I don't think a long
> timeout and a short timeout will be much different cuz both will block till
> the broker is up, right?
>
> On Thu, Jan 28, 2016 at 1:36 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Yifan,
> >
> > There are some cases that could cause a consumer to block longer than the
> > specified poll timeout, for example if Kafka is not up and running at all
> > the consumer would be blocked until it is connected to the broker. Some
> > more details are here: https://issues.apache.org/jira/browse/KAFKA-1894.
> >
> > We are working on solving that issue so that if no Kafka broker is
> > connectable within the poll timeout, it will return within the period
> > indicating the error. Does that solve your case?
> >
> > Guozhang
> >
> >
> > On Thu, Jan 28, 2016 at 1:18 PM, Yifan Ying <na...@gmail.com> wrote:
> >
> > > Hi All,
> > >
> > > I was using the new Kafka Consumer to fetch messages in this way:
> > >
> > > while (true) {
> > >     ConsumerRecords<Object, T> records =
> > > kafkaConsumer.poll(Long.MAX_VALUE);
> > >     // do nothing if records are empty
> > >     ....
> > > }
> > >
> > > Then I realized that blocking until new messages fetched might be a
> > little
> > > overhead. So I looked into the KafkaConsumer code to figure out get a
> > > reasonable timeout.
> > >
> > > do {
> > >     Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
> > > pollOnce(remaining);
> > >     if (!records.isEmpty()) {
> > >         // if data is available, then return it, but first send off the
> > >         // next round of fetches to enable pipelining while the user is
> > >         // handling the fetched records.
> > >         fetcher.initFetches(metadata.fetch());
> > >         client.poll(0);
> > >         return new ConsumerRecords<>(records);
> > >     }
> > >
> > >     long elapsed = time.milliseconds() - start;
> > >     remaining = timeout - elapsed;
> > > } while (remaining > 0);
> > >
> > > It seems that even if I set a much lower timeout, like 1000ms, my code
> > will
> > > still keep fetching messages, as I use while(true) and the code won't
> do
> > > anything with an empty message set. So the only difference between a
> high
> > > timeout and a low one is that the code is looping in the while loop I
> > wrote
> > > or the one in poll(). But in terms of connections to Kafka, setting a
> low
> > > or high timeout won't affect much in my case.
> > >
> > > I might misunderstand the code completely. Anyone is able to shed some
> > > light on this topic?
> > >
> > > Thanks.
> > >
> > > --
> > > Yifan
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Yifan
>



-- 
-- Guozhang

Re: How to set Timeout for KafkaConsumer.poll()

Posted by Yifan Ying <na...@gmail.com>.
Thanks, Jason and Guozhang. I guess the conclusion is that, if there is no
other logic to execute other than poll(), a long timeout and a short
timeout have no big difference.

Guozhang, if I understand right, even with this issue, I don't think a long
timeout and a short timeout will be much different cuz both will block till
the broker is up, right?

On Thu, Jan 28, 2016 at 1:36 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Yifan,
>
> There are some cases that could cause a consumer to block longer than the
> specified poll timeout, for example if Kafka is not up and running at all
> the consumer would be blocked until it is connected to the broker. Some
> more details are here: https://issues.apache.org/jira/browse/KAFKA-1894.
>
> We are working on solving that issue so that if no Kafka broker is
> connectable within the poll timeout, it will return within the period
> indicating the error. Does that solve your case?
>
> Guozhang
>
>
> On Thu, Jan 28, 2016 at 1:18 PM, Yifan Ying <na...@gmail.com> wrote:
>
> > Hi All,
> >
> > I was using the new Kafka Consumer to fetch messages in this way:
> >
> > while (true) {
> >     ConsumerRecords<Object, T> records =
> > kafkaConsumer.poll(Long.MAX_VALUE);
> >     // do nothing if records are empty
> >     ....
> > }
> >
> > Then I realized that blocking until new messages fetched might be a
> little
> > overhead. So I looked into the KafkaConsumer code to figure out get a
> > reasonable timeout.
> >
> > do {
> >     Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
> > pollOnce(remaining);
> >     if (!records.isEmpty()) {
> >         // if data is available, then return it, but first send off the
> >         // next round of fetches to enable pipelining while the user is
> >         // handling the fetched records.
> >         fetcher.initFetches(metadata.fetch());
> >         client.poll(0);
> >         return new ConsumerRecords<>(records);
> >     }
> >
> >     long elapsed = time.milliseconds() - start;
> >     remaining = timeout - elapsed;
> > } while (remaining > 0);
> >
> > It seems that even if I set a much lower timeout, like 1000ms, my code
> will
> > still keep fetching messages, as I use while(true) and the code won't do
> > anything with an empty message set. So the only difference between a high
> > timeout and a low one is that the code is looping in the while loop I
> wrote
> > or the one in poll(). But in terms of connections to Kafka, setting a low
> > or high timeout won't affect much in my case.
> >
> > I might misunderstand the code completely. Anyone is able to shed some
> > light on this topic?
> >
> > Thanks.
> >
> > --
> > Yifan
> >
>
>
>
> --
> -- Guozhang
>



-- 
Yifan

Re: How to set Timeout for KafkaConsumer.poll()

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Yifan,

There are some cases that could cause a consumer to block longer than the
specified poll timeout, for example if Kafka is not up and running at all
the consumer would be blocked until it is connected to the broker. Some
more details are here: https://issues.apache.org/jira/browse/KAFKA-1894.

We are working on solving that issue so that if no Kafka broker is
connectable within the poll timeout, it will return within the period
indicating the error. Does that solve your case?

Guozhang


On Thu, Jan 28, 2016 at 1:18 PM, Yifan Ying <na...@gmail.com> wrote:

> Hi All,
>
> I was using the new Kafka Consumer to fetch messages in this way:
>
> while (true) {
>     ConsumerRecords<Object, T> records =
> kafkaConsumer.poll(Long.MAX_VALUE);
>     // do nothing if records are empty
>     ....
> }
>
> Then I realized that blocking until new messages fetched might be a little
> overhead. So I looked into the KafkaConsumer code to figure out get a
> reasonable timeout.
>
> do {
>     Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
> pollOnce(remaining);
>     if (!records.isEmpty()) {
>         // if data is available, then return it, but first send off the
>         // next round of fetches to enable pipelining while the user is
>         // handling the fetched records.
>         fetcher.initFetches(metadata.fetch());
>         client.poll(0);
>         return new ConsumerRecords<>(records);
>     }
>
>     long elapsed = time.milliseconds() - start;
>     remaining = timeout - elapsed;
> } while (remaining > 0);
>
> It seems that even if I set a much lower timeout, like 1000ms, my code will
> still keep fetching messages, as I use while(true) and the code won't do
> anything with an empty message set. So the only difference between a high
> timeout and a low one is that the code is looping in the while loop I wrote
> or the one in poll(). But in terms of connections to Kafka, setting a low
> or high timeout won't affect much in my case.
>
> I might misunderstand the code completely. Anyone is able to shed some
> light on this topic?
>
> Thanks.
>
> --
> Yifan
>



-- 
-- Guozhang