You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jin Yi <el...@gmail.com> on 2020/02/20 22:14:13 UTC

How does Flink manage the kafka offset

Hi there,

We are running apache beam application with flink being the runner.

We use the KafkaIO connector to read from topics:
https://beam.apache.org/releases/javadoc/2.19.0/

and we have the following configuration, which enables auto commit of
offset, no checkpointing is enabled, and it is performing element wise
processing.

So we run our application in Flink Job Cluster mode, and if I run the same
job twice, meaning start 2 flink job clusters, then I see message being
processed twice.

My understanding is, upon startup, Flink Job Manager will contact kafka to
get the offset for each partition for this consume group, and distribute
the task to task managers, and it does not use kafka to manage the consumer
group.

and when the 2nd job cluster starts up, it does the same thing, so the 1st
job cluster is not aware of there are new consumers from the same consume
group have joined.

But if I add more task managers to the same job cluster, then job manager
is aware of more consumers from this consume group has joined, and it will
rebalance the partition consumption if needed.

Is my understanding correct?

Thanks a lot!
Eleanore

Map<String, Object> consumerConfig = ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();

return KafkaIO.<String, JsonNode>read()
.withBootstrapServers(kafkaSettings.getBootstrapServers())
.withTopic(topic)
.withKeyDeserializer(KeyDeserializer.class)
.withValueDeserializerAndCoder(getDeserializer(encoding), new
JsonNodeCoder<>())
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();

Re: How does Flink manage the kafka offset

Posted by Jin Yi <el...@gmail.com>.
Hi Benchao,

Thanks a lot!
Eleanore

On Thu, Feb 20, 2020 at 4:30 PM Benchao Li <li...@gmail.com> wrote:

> Hi Jin,
>
> See below inline replies:
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
>> get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>
>
> Generally, yes. If you are not using checkpoint, and starting from
> group-offsets, Flink will read offset from Kafka at startup.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
>> job cluster is not aware of there are new consumers from the same consume
>> group have joined.
>
>
> Yes.
>
> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>
>
> No. Flink does not rebalance the partitions when new task managers joined
> cluster. It only did so when job restarts and job parallelism changes.
>
> Hope it helps.
>
> Jin Yi <el...@gmail.com> 于2020年2月21日周五 上午6:14写道:
>
>> Hi there,
>>
>> We are running apache beam application with flink being the runner.
>>
>> We use the KafkaIO connector to read from topics:
>> https://beam.apache.org/releases/javadoc/2.19.0/
>>
>> and we have the following configuration, which enables auto commit of
>> offset, no checkpointing is enabled, and it is performing element wise
>> processing.
>>
>> So we run our application in Flink Job Cluster mode, and if I run the
>> same job twice, meaning start 2 flink job clusters, then I see message
>> being processed twice.
>>
>> My understanding is, upon startup, Flink Job Manager will contact kafka
>> to get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>>
>> and when the 2nd job cluster starts up, it does the same thing, so the
>> 1st job cluster is not aware of there are new consumers from the same
>> consume group have joined.
>>
>> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>>
>> Is my understanding correct?
>>
>> Thanks a lot!
>> Eleanore
>>
>> Map<String, Object> consumerConfig = ImmutableMap.<String,
>> Object>builder()
>> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
>> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
>> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
>> .build();
>>
>> return KafkaIO.<String, JsonNode>read()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withTopic(topic)
>> .withKeyDeserializer(KeyDeserializer.class)
>> .withValueDeserializerAndCoder(getDeserializer(encoding), new
>> JsonNodeCoder<>())
>> .withConsumerConfigUpdates(consumerConfig)
>> .withoutMetadata();
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>

Re: How does Flink manage the kafka offset

Posted by Jin Yi <el...@gmail.com>.
Hi Benchao,

Thanks a lot!
Eleanore

On Thu, Feb 20, 2020 at 4:30 PM Benchao Li <li...@gmail.com> wrote:

> Hi Jin,
>
> See below inline replies:
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
>> get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>
>
> Generally, yes. If you are not using checkpoint, and starting from
> group-offsets, Flink will read offset from Kafka at startup.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
>> job cluster is not aware of there are new consumers from the same consume
>> group have joined.
>
>
> Yes.
>
> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>
>
> No. Flink does not rebalance the partitions when new task managers joined
> cluster. It only did so when job restarts and job parallelism changes.
>
> Hope it helps.
>
> Jin Yi <el...@gmail.com> 于2020年2月21日周五 上午6:14写道:
>
>> Hi there,
>>
>> We are running apache beam application with flink being the runner.
>>
>> We use the KafkaIO connector to read from topics:
>> https://beam.apache.org/releases/javadoc/2.19.0/
>>
>> and we have the following configuration, which enables auto commit of
>> offset, no checkpointing is enabled, and it is performing element wise
>> processing.
>>
>> So we run our application in Flink Job Cluster mode, and if I run the
>> same job twice, meaning start 2 flink job clusters, then I see message
>> being processed twice.
>>
>> My understanding is, upon startup, Flink Job Manager will contact kafka
>> to get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>>
>> and when the 2nd job cluster starts up, it does the same thing, so the
>> 1st job cluster is not aware of there are new consumers from the same
>> consume group have joined.
>>
>> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>>
>> Is my understanding correct?
>>
>> Thanks a lot!
>> Eleanore
>>
>> Map<String, Object> consumerConfig = ImmutableMap.<String,
>> Object>builder()
>> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
>> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
>> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
>> .build();
>>
>> return KafkaIO.<String, JsonNode>read()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withTopic(topic)
>> .withKeyDeserializer(KeyDeserializer.class)
>> .withValueDeserializerAndCoder(getDeserializer(encoding), new
>> JsonNodeCoder<>())
>> .withConsumerConfigUpdates(consumerConfig)
>> .withoutMetadata();
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>

Re: How does Flink manage the kafka offset

Posted by Benchao Li <li...@gmail.com>.
Hi Jin,

See below inline replies:

My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.


Generally, yes. If you are not using checkpoint, and starting from
group-offsets, Flink will read offset from Kafka at startup.

and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.


Yes.

But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.


No. Flink does not rebalance the partitions when new task managers joined
cluster. It only did so when job restarts and job parallelism changes.

Hope it helps.

Jin Yi <el...@gmail.com> 于2020年2月21日周五 上午6:14写道:

> Hi there,
>
> We are running apache beam application with flink being the runner.
>
> We use the KafkaIO connector to read from topics:
> https://beam.apache.org/releases/javadoc/2.19.0/
>
> and we have the following configuration, which enables auto commit of
> offset, no checkpointing is enabled, and it is performing element wise
> processing.
>
> So we run our application in Flink Job Cluster mode, and if I run the same
> job twice, meaning start 2 flink job clusters, then I see message being
> processed twice.
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.
>
> But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.
>
> Is my understanding correct?
>
> Thanks a lot!
> Eleanore
>
> Map<String, Object> consumerConfig = ImmutableMap.<String, Object>builder()
> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
> .build();
>
> return KafkaIO.<String, JsonNode>read()
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
> .withTopic(topic)
> .withKeyDeserializer(KeyDeserializer.class)
> .withValueDeserializerAndCoder(getDeserializer(encoding), new
> JsonNodeCoder<>())
> .withConsumerConfigUpdates(consumerConfig)
> .withoutMetadata();
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: How does Flink manage the kafka offset

Posted by Benchao Li <li...@gmail.com>.
Hi Jin,

See below inline replies:

My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.


Generally, yes. If you are not using checkpoint, and starting from
group-offsets, Flink will read offset from Kafka at startup.

and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.


Yes.

But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.


No. Flink does not rebalance the partitions when new task managers joined
cluster. It only did so when job restarts and job parallelism changes.

Hope it helps.

Jin Yi <el...@gmail.com> 于2020年2月21日周五 上午6:14写道:

> Hi there,
>
> We are running apache beam application with flink being the runner.
>
> We use the KafkaIO connector to read from topics:
> https://beam.apache.org/releases/javadoc/2.19.0/
>
> and we have the following configuration, which enables auto commit of
> offset, no checkpointing is enabled, and it is performing element wise
> processing.
>
> So we run our application in Flink Job Cluster mode, and if I run the same
> job twice, meaning start 2 flink job clusters, then I see message being
> processed twice.
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.
>
> But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.
>
> Is my understanding correct?
>
> Thanks a lot!
> Eleanore
>
> Map<String, Object> consumerConfig = ImmutableMap.<String, Object>builder()
> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
> .build();
>
> return KafkaIO.<String, JsonNode>read()
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
> .withTopic(topic)
> .withKeyDeserializer(KeyDeserializer.class)
> .withValueDeserializerAndCoder(getDeserializer(encoding), new
> JsonNodeCoder<>())
> .withConsumerConfigUpdates(consumerConfig)
> .withoutMetadata();
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn