You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <ja...@gmail.com> on 2022/01/31 14:40:15 UTC

Kafka Consumer Group Name not set if no checkpointing?

Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I have
noticed that the Kafka Consumer Group is not set?

I use Kafka Explorer to see all the consumers and when I run the job I
don't see the consumer group. Finally I decided to enable checkpointing and
restart the job and finally saw the consumer group.

Is this expected behaviour?

Re: Kafka Consumer Group Name not set if no checkpointing?

Posted by John Smith <ja...@gmail.com>.
Ok that's fine. It's just a thing we are used to that functionality from
basically every other consumer we have flink or not. So we monitor the
offsets for lateness or just to look.

On Tue, 1 Feb 2022 at 03:38, Fabian Paul <fp...@apache.org> wrote:

> Hi John,
>
> You are seeing what I described in my previous mail. The KafkaSource
> only writes consumer offsets to Kafka when a checkpoint is finished
> [1]. Flink does not leverage the offsets stored in Kafka to ensure
> exactly-once processing but it writes the last read offset to Flink's
> internal state that is part of the checkpoint.
>
> Please be also aware that it is not guaranteed that the offsets you
> are seeing with Kafka Explorer reflect the latest record read by the
> KafkaSource because the offset is committed asynchronously and we do
> not ensure that it succeeds.
>
> Maybe you can share with us why you want to inspect the progress of
> the KafkaSource with Kafka Explorer.
>
> Best,
> Fabian
>
>
> [1]
> https://github.com/apache/flink/blob/dea2b10502a493e9d4137e7d94d2dac85d9fa666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L221
>
> On Mon, Jan 31, 2022 at 8:08 PM John Smith <ja...@gmail.com> wrote:
> >
> > Hi yes, see below. So it only seems to show the consumer offsets if
> checkpointing is on... That's the only diff I can see between my two
> different jobs. And the moment I enabled it on the job. It started showing
> in Kafka Explorer here: https://www.kafkatool.com/
> >
> > return KafkaSource.<JsonObject>builder()
> >         .setBootstrapServers(bootstrapServers)
> >         .setTopics(topic)
> >         .setValueOnlyDeserializer(new VertxJsonSchema())
> >         .setGroupId(consumerGroup)
> >         .setStartingOffsets(oi)
> >         .setProperties(props)
> >         .build();
> >
> >
> > On Mon, 31 Jan 2022 at 12:03, Fabian Paul <fp...@apache.org> wrote:
> >>
> >> Hi John,
> >> First I would like to ask you to give us some more information about
> >> how you consume from Kafka with Flink. It is currently recommended to
> >> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
> >>
> >> One thing to already note is that by default Flink does not commit the
> >> Kafka to offset back to the topic because it is not needed from a
> >> Flink perspective and is only supported on a best-effort basis if a
> >> checkpoint completes.
> >>
> >> I am not very familiar with Kafka Explorer but I can imagine it only
> >> shows the consumer group if there are actually committed offsets
> >>
> >> Best,
> >> Fabian
> >>
> >> On Mon, Jan 31, 2022 at 3:41 PM John Smith <ja...@gmail.com>
> wrote:
> >> >
> >> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I
> have noticed that the Kafka Consumer Group is not set?
> >> >
> >> > I use Kafka Explorer to see all the consumers and when I run the job
> I don't see the consumer group. Finally I decided to enable checkpointing
> and restart the job and finally saw the consumer group.
> >> >
> >> > Is this expected behaviour?
>

Re: Kafka Consumer Group Name not set if no checkpointing?

Posted by Fabian Paul <fp...@apache.org>.
Hi John,

You are seeing what I described in my previous mail. The KafkaSource
only writes consumer offsets to Kafka when a checkpoint is finished
[1]. Flink does not leverage the offsets stored in Kafka to ensure
exactly-once processing but it writes the last read offset to Flink's
internal state that is part of the checkpoint.

Please be also aware that it is not guaranteed that the offsets you
are seeing with Kafka Explorer reflect the latest record read by the
KafkaSource because the offset is committed asynchronously and we do
not ensure that it succeeds.

Maybe you can share with us why you want to inspect the progress of
the KafkaSource with Kafka Explorer.

Best,
Fabian


[1] https://github.com/apache/flink/blob/dea2b10502a493e9d4137e7d94d2dac85d9fa666/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L221

On Mon, Jan 31, 2022 at 8:08 PM John Smith <ja...@gmail.com> wrote:
>
> Hi yes, see below. So it only seems to show the consumer offsets if checkpointing is on... That's the only diff I can see between my two different jobs. And the moment I enabled it on the job. It started showing in Kafka Explorer here: https://www.kafkatool.com/
>
> return KafkaSource.<JsonObject>builder()
>         .setBootstrapServers(bootstrapServers)
>         .setTopics(topic)
>         .setValueOnlyDeserializer(new VertxJsonSchema())
>         .setGroupId(consumerGroup)
>         .setStartingOffsets(oi)
>         .setProperties(props)
>         .build();
>
>
> On Mon, 31 Jan 2022 at 12:03, Fabian Paul <fp...@apache.org> wrote:
>>
>> Hi John,
>> First I would like to ask you to give us some more information about
>> how you consume from Kafka with Flink. It is currently recommended to
>> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
>>
>> One thing to already note is that by default Flink does not commit the
>> Kafka to offset back to the topic because it is not needed from a
>> Flink perspective and is only supported on a best-effort basis if a
>> checkpoint completes.
>>
>> I am not very familiar with Kafka Explorer but I can imagine it only
>> shows the consumer group if there are actually committed offsets
>>
>> Best,
>> Fabian
>>
>> On Mon, Jan 31, 2022 at 3:41 PM John Smith <ja...@gmail.com> wrote:
>> >
>> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I have noticed that the Kafka Consumer Group is not set?
>> >
>> > I use Kafka Explorer to see all the consumers and when I run the job I don't see the consumer group. Finally I decided to enable checkpointing and restart the job and finally saw the consumer group.
>> >
>> > Is this expected behaviour?

Re: Kafka Consumer Group Name not set if no checkpointing?

Posted by John Smith <ja...@gmail.com>.
Hi yes, see below. So it only seems to show the consumer offsets if
checkpointing is on... That's the only diff I can see between my two
different jobs. And the moment I enabled it on the job. It started showing
in Kafka Explorer here: https://www.kafkatool.com/

return KafkaSource.<JsonObject>builder()
        .setBootstrapServers(bootstrapServers)
        .setTopics(topic)
        .setValueOnlyDeserializer(new VertxJsonSchema())
        .setGroupId(consumerGroup)
        .setStartingOffsets(oi)
        .setProperties(props)
        .build();


On Mon, 31 Jan 2022 at 12:03, Fabian Paul <fp...@apache.org> wrote:

> Hi John,
> First I would like to ask you to give us some more information about
> how you consume from Kafka with Flink. It is currently recommended to
> use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.
>
> One thing to already note is that by default Flink does not commit the
> Kafka to offset back to the topic because it is not needed from a
> Flink perspective and is only supported on a best-effort basis if a
> checkpoint completes.
>
> I am not very familiar with Kafka Explorer but I can imagine it only
> shows the consumer group if there are actually committed offsets
>
> Best,
> Fabian
>
> On Mon, Jan 31, 2022 at 3:41 PM John Smith <ja...@gmail.com> wrote:
> >
> > Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I
> have noticed that the Kafka Consumer Group is not set?
> >
> > I use Kafka Explorer to see all the consumers and when I run the job I
> don't see the consumer group. Finally I decided to enable checkpointing and
> restart the job and finally saw the consumer group.
> >
> > Is this expected behaviour?
>

Re: Kafka Consumer Group Name not set if no checkpointing?

Posted by Fabian Paul <fp...@apache.org>.
Hi John,
First I would like to ask you to give us some more information about
how you consume from Kafka with Flink. It is currently recommended to
use the KafkaSource that subsumes the deprecated FlinkKafkaConsumer.

One thing to already note is that by default Flink does not commit the
Kafka to offset back to the topic because it is not needed from a
Flink perspective and is only supported on a best-effort basis if a
checkpoint completes.

I am not very familiar with Kafka Explorer but I can imagine it only
shows the consumer group if there are actually committed offsets

Best,
Fabian

On Mon, Jan 31, 2022 at 3:41 PM John Smith <ja...@gmail.com> wrote:
>
> Hi using flink 1.14.3, I have a job that doesn't use checkpointing. I have noticed that the Kafka Consumer Group is not set?
>
> I use Kafka Explorer to see all the consumers and when I run the job I don't see the consumer group. Finally I decided to enable checkpointing and restart the job and finally saw the consumer group.
>
> Is this expected behaviour?