You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by RKandoji <rk...@gmail.com> on 2020/01/31 00:51:03 UTC

Issue with committing Kafka offsets

Hi Team,

I'm running into strange issue pasted below:

Committing offsets to Kafka takes longer than the checkpoint interval.
Skipping commit of previous offsets because newer complete checkpoint
offsets are available. This does not compromise Flink's checkpoint
integrity.


I read data from more than 10 different Kafka topics, I started noticing
this issue as I integrate more number of Kafkaconsumer reading from
respective topics.

Wondering if there is any upper limit on the number of Kafka consumers
(Kafka topics) per job?

If not could someone please shed some light on why this could be happening?

Thanks,
RK

Re: Issue with committing Kafka offsets

Posted by RKandoji <rk...@gmail.com>.
I had to disable auto commit for it to work. I understand auto commit is
just for monitoring purpose so I assume it should be safe to run it like
that.

properties.put("enable.auto.commit", "false");


On Fri, Jan 31, 2020 at 1:09 PM RKandoji <rk...@gmail.com> wrote:

> Hi,
>
> Thanks for the response.
> I'm actually using a different consumer group id for each consumer.
>
> RK.
>
> On Fri, Jan 31, 2020 at 12:20 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> There are no upper limits on the number of Kafka consumers per job.
>>
>> For each one of your FlinkKafkaConsumers, are you using the same group.id
>> ?
>> That could maybe explain why you are experiencing higher commit times as
>> you are adding more FlinkKafkaConsumers, as AFAIK on the broker side, the
>> commit operations for the same consumer group are enqueued together.
>>
>> As a side note, as the warning message already mentions, this does not
>> affect Flink's exactly-once guarantees.
>> If the only reason that you want to commit the offsets back to Kafka is
>> to have a way to monitor progress, it should be fine to define different
>> consumer group ids for each FlinkKafkaConsumer.
>>
>> Hope this helps,
>> Gordon
>>
>> On Sat, Feb 1, 2020 at 12:54 AM RKandoji <rk...@gmail.com> wrote:
>>
>>> Can someone please help me here.
>>>
>>> Thanks
>>> RK
>>>
>>>
>>> On Thu, Jan 30, 2020 at 7:51 PM RKandoji <rk...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I'm running into strange issue pasted below:
>>>>
>>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>>> Skipping commit of previous offsets because newer complete checkpoint
>>>> offsets are available. This does not compromise Flink's checkpoint
>>>> integrity.
>>>>
>>>>
>>>> I read data from more than 10 different Kafka topics, I started
>>>> noticing this issue as I integrate more number of Kafkaconsumer reading
>>>> from respective topics.
>>>>
>>>> Wondering if there is any upper limit on the number of Kafka consumers
>>>> (Kafka topics) per job?
>>>>
>>>> If not could someone please shed some light on why this could be
>>>> happening?
>>>>
>>>> Thanks,
>>>> RK
>>>>
>>>

Re: Issue with committing Kafka offsets

Posted by RKandoji <rk...@gmail.com>.
Hi,

Thanks for the response.
I'm actually using a different consumer group id for each consumer.

RK.

On Fri, Jan 31, 2020 at 12:20 PM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> There are no upper limits on the number of Kafka consumers per job.
>
> For each one of your FlinkKafkaConsumers, are you using the same group.id?
> That could maybe explain why you are experiencing higher commit times as
> you are adding more FlinkKafkaConsumers, as AFAIK on the broker side, the
> commit operations for the same consumer group are enqueued together.
>
> As a side note, as the warning message already mentions, this does not
> affect Flink's exactly-once guarantees.
> If the only reason that you want to commit the offsets back to Kafka is to
> have a way to monitor progress, it should be fine to define different
> consumer group ids for each FlinkKafkaConsumer.
>
> Hope this helps,
> Gordon
>
> On Sat, Feb 1, 2020 at 12:54 AM RKandoji <rk...@gmail.com> wrote:
>
>> Can someone please help me here.
>>
>> Thanks
>> RK
>>
>>
>> On Thu, Jan 30, 2020 at 7:51 PM RKandoji <rk...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I'm running into strange issue pasted below:
>>>
>>> Committing offsets to Kafka takes longer than the checkpoint interval.
>>> Skipping commit of previous offsets because newer complete checkpoint
>>> offsets are available. This does not compromise Flink's checkpoint
>>> integrity.
>>>
>>>
>>> I read data from more than 10 different Kafka topics, I started noticing
>>> this issue as I integrate more number of Kafkaconsumer reading from
>>> respective topics.
>>>
>>> Wondering if there is any upper limit on the number of Kafka consumers
>>> (Kafka topics) per job?
>>>
>>> If not could someone please shed some light on why this could be
>>> happening?
>>>
>>> Thanks,
>>> RK
>>>
>>

Re: Issue with committing Kafka offsets

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

There are no upper limits on the number of Kafka consumers per job.

For each one of your FlinkKafkaConsumers, are you using the same group.id?
That could maybe explain why you are experiencing higher commit times as
you are adding more FlinkKafkaConsumers, as AFAIK on the broker side, the
commit operations for the same consumer group are enqueued together.

As a side note, as the warning message already mentions, this does not
affect Flink's exactly-once guarantees.
If the only reason that you want to commit the offsets back to Kafka is to
have a way to monitor progress, it should be fine to define different
consumer group ids for each FlinkKafkaConsumer.

Hope this helps,
Gordon

On Sat, Feb 1, 2020 at 12:54 AM RKandoji <rk...@gmail.com> wrote:

> Can someone please help me here.
>
> Thanks
> RK
>
>
> On Thu, Jan 30, 2020 at 7:51 PM RKandoji <rk...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I'm running into strange issue pasted below:
>>
>> Committing offsets to Kafka takes longer than the checkpoint interval.
>> Skipping commit of previous offsets because newer complete checkpoint
>> offsets are available. This does not compromise Flink's checkpoint
>> integrity.
>>
>>
>> I read data from more than 10 different Kafka topics, I started noticing
>> this issue as I integrate more number of Kafkaconsumer reading from
>> respective topics.
>>
>> Wondering if there is any upper limit on the number of Kafka consumers
>> (Kafka topics) per job?
>>
>> If not could someone please shed some light on why this could be
>> happening?
>>
>> Thanks,
>> RK
>>
>

Re: Issue with committing Kafka offsets

Posted by RKandoji <rk...@gmail.com>.
Can someone please help me here.

Thanks
RK


On Thu, Jan 30, 2020 at 7:51 PM RKandoji <rk...@gmail.com> wrote:

> Hi Team,
>
> I'm running into strange issue pasted below:
>
> Committing offsets to Kafka takes longer than the checkpoint interval.
> Skipping commit of previous offsets because newer complete checkpoint
> offsets are available. This does not compromise Flink's checkpoint
> integrity.
>
>
> I read data from more than 10 different Kafka topics, I started noticing
> this issue as I integrate more number of Kafkaconsumer reading from
> respective topics.
>
> Wondering if there is any upper limit on the number of Kafka consumers
> (Kafka topics) per job?
>
> If not could someone please shed some light on why this could be happening?
>
> Thanks,
> RK
>