You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lars Skjærven <la...@gmail.com> on 2021/09/15 09:25:42 UTC

KafkaSource builder and checkpointing with parallelism > kafka partitions

Using KafkaSource builder with a job parallelism larger than the number of
kafka partitions, the job is unable to checkpoint.

With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
kafka topic with one partition. For this reason checkpointing seems to be
disabled.

When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't see
this behavior, and all 4 tasks have status RUNNING.

Is there any way of using KafkaSource builder ang get the same behavior as
FlinkKafkaConsumer for the number of tasks being used ?

Code with KafkaSource.builder:

    val metadataSource = KafkaSource.builder[Metadata]()
      .setBootstrapServers("kafka-server")
      .setGroupId("my-group")
      .setTopics("my-topic")
      .setDeserializer(new MetadataDeserializationSchema)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()

Code with FlinkKafkaConsumer:
    val metadataSource = new FlinkKafkaConsumer[Metadata](
      "my-topic",
      new MetadataDeserializationSchema,
      "my-server)
      .setStartFromEarliest()

Thanks in advance,
Lars

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

Posted by Lars Skjærven <la...@gmail.com>.
Thanks for the feedback.

> May I ask why you have less partitions than the parallelism? I would be
happy to learn more about your use-case to better understand the
> motivation.

The use case is that topic A, contains just a few messages with product
metadata that rarely gets updated, while topic B contains user interactions
with the products (and many more messages). For topic A we thought that one
partition will be sufficient to keep the metadata, while we have 32
partitions for topic B. Due to the load on topic B, we're use a parallelism
of 2-8.

Thanks,
Lars



On Thu, Sep 16, 2021 at 9:09 AM Fabian Paul <fa...@ververica.com>
wrote:

> Hi all,
>
> The problem you are seeing Lars is somewhat intended behaviour,
> unfortunately. With the batch/stream unification every Kafka partition is
> treated
> as kind of workload assignment. If one subtask receives a signal that
> there is no workload anymore it goes into the FINISHED state.
> As already pointed this restriction will lift in the near future.
>
> I went through the code and I think in your case you can configure the
> following configuration [1] which should show an equal behaviour than the
> old source. This will prevent the enumerator from sending a final signal
> to the subtasks and they will not go into finished state.
>
> May I ask why you have less partitions than the parallelism? I would be
> happy to learn more about your use-case to better understand the
> motivation.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

Posted by Fabian Paul <fa...@ververica.com>.
Hi all,

The problem you are seeing Lars is somewhat intended behaviour, unfortunately. With the batch/stream unification every Kafka partition is treated 
as kind of workload assignment. If one subtask receives a signal that there is no workload anymore it goes into the FINISHED state.
As already pointed this restriction will lift in the near future.

I went through the code and I think in your case you can configure the following configuration [1] which should show an equal behaviour than the
old source. This will prevent the enumerator from sending a final signal to the subtasks and they will not go into finished state.

May I ask why you have less partitions than the parallelism? I would be happy to learn more about your use-case to better understand the
motivation.

Best,
Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

Posted by David Morávek <dm...@apache.org>.
If we are shutting down any sources of unbounded jobs that run on Flink
versions without FLIP-147 (available in 1.14) [1], that Matthias has
mentioned, than it's IMO a bug, because it effectively breaks
checkpointing. Fabian, can you please verify whether this is an intended
behavior?

In the meantime, another workaround could be simply keeping the source
parallelism lower or equal the number of Kafka partitions.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

Best,
D.

On Wed, Sep 15, 2021 at 9:00 PM Lars Skjærven <la...@gmail.com> wrote:

> Got it. So the workaround for now (1.13.2) is to fall back to
> FlinkKafkaConsumer if I read you correctly.
> Thanks
> L
>
> On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Lars,
>> I guess you are looking
>> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
>> This configuration parameter is going to be introduced in the upcoming
>> Flink 1.14 release.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
>>
>> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <la...@gmail.com> wrote:
>>
>>> Using KafkaSource builder with a job parallelism larger than the number
>>> of kafka partitions, the job is unable to checkpoint.
>>>
>>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>>> the kafka topic with one partition. For this reason checkpointing seems to
>>> be disabled.
>>>
>>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
>>> see this behavior, and all 4 tasks have status RUNNING.
>>>
>>> Is there any way of using KafkaSource builder ang get the same behavior
>>> as FlinkKafkaConsumer for the number of tasks being used ?
>>>
>>> Code with KafkaSource.builder:
>>>
>>>     val metadataSource = KafkaSource.builder[Metadata]()
>>>       .setBootstrapServers("kafka-server")
>>>       .setGroupId("my-group")
>>>       .setTopics("my-topic")
>>>       .setDeserializer(new MetadataDeserializationSchema)
>>>       .setStartingOffsets(OffsetsInitializer.earliest())
>>>       .build()
>>>
>>> Code with FlinkKafkaConsumer:
>>>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>>>       "my-topic",
>>>       new MetadataDeserializationSchema,
>>>       "my-server)
>>>       .setStartFromEarliest()
>>>
>>> Thanks in advance,
>>> Lars
>>>
>>

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

Posted by Lars Skjærven <la...@gmail.com>.
Got it. So the workaround for now (1.13.2) is to fall back to
FlinkKafkaConsumer if I read you correctly.
Thanks
L

On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Lars,
> I guess you are looking
> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
> This configuration parameter is going to be introduced in the upcoming
> Flink 1.14 release.
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
>
> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <la...@gmail.com> wrote:
>
>> Using KafkaSource builder with a job parallelism larger than the number
>> of kafka partitions, the job is unable to checkpoint.
>>
>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>> the kafka topic with one partition. For this reason checkpointing seems to
>> be disabled.
>>
>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
>> see this behavior, and all 4 tasks have status RUNNING.
>>
>> Is there any way of using KafkaSource builder ang get the same behavior
>> as FlinkKafkaConsumer for the number of tasks being used ?
>>
>> Code with KafkaSource.builder:
>>
>>     val metadataSource = KafkaSource.builder[Metadata]()
>>       .setBootstrapServers("kafka-server")
>>       .setGroupId("my-group")
>>       .setTopics("my-topic")
>>       .setDeserializer(new MetadataDeserializationSchema)
>>       .setStartingOffsets(OffsetsInitializer.earliest())
>>       .build()
>>
>> Code with FlinkKafkaConsumer:
>>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>>       "my-topic",
>>       new MetadataDeserializationSchema,
>>       "my-server)
>>       .setStartFromEarliest()
>>
>> Thanks in advance,
>> Lars
>>
>

Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Lars,
I guess you are looking
for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
This configuration parameter is going to be introduced in the upcoming
Flink 1.14 release.

Best,
Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled

On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven <la...@gmail.com> wrote:

> Using KafkaSource builder with a job parallelism larger than the number of
> kafka partitions, the job is unable to checkpoint.
>
> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
> kafka topic with one partition. For this reason checkpointing seems to be
> disabled.
>
> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
> see this behavior, and all 4 tasks have status RUNNING.
>
> Is there any way of using KafkaSource builder ang get the same behavior as
> FlinkKafkaConsumer for the number of tasks being used ?
>
> Code with KafkaSource.builder:
>
>     val metadataSource = KafkaSource.builder[Metadata]()
>       .setBootstrapServers("kafka-server")
>       .setGroupId("my-group")
>       .setTopics("my-topic")
>       .setDeserializer(new MetadataDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> Code with FlinkKafkaConsumer:
>     val metadataSource = new FlinkKafkaConsumer[Metadata](
>       "my-topic",
>       new MetadataDeserializationSchema,
>       "my-server)
>       .setStartFromEarliest()
>
> Thanks in advance,
> Lars
>