You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hang Ruan <ru...@gmail.com> on 2021/12/01 07:06:29 UTC

Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

Hi,

In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
is open is the default behavior in KafkaSourceBuilder. And it can not be
changed in KafkaSourceBuilder.

By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we
could change the behavior. This problem will be fixed in 1.12.6. It seems
not to be contained in your version.

Reading the RP will be helpful for you to understand the behavior.


Marco Villalobos <mv...@kineteque.com> 于2021年12月1日周三 上午3:43写道:

> Thanks!
>
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
> does not exist in Flink 1.12.
>
> Is that property supported with the string "commit.offsets.on.checkpoints"?
>
> How do I configure that behavior so that offsets get committed on
> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
> default behavior with checkpoints?
>
>
>
>
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ru...@gmail.com> wrote:
>
>> Hi,
>>
>> Maybe you can write like this :
>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>> "true");
>>
>> Other additional properties could be found here :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>
>> Marco Villalobos <mv...@kineteque.com> 于2021年11月30日周二 上午11:08写道:
>>
>>> Thank you for the information.  That still does not answer my question
>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>> method.
>>>
>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>> default behavior with checkpoints?
>>>
>>> -Marco
>>>
>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <ts...@gmail.com>
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> Flink 1.14 release note states about this. See [1].
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>
>>>> Marco Villalobos <mv...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>>>>
>>>>> Hi everybody,
>>>>>
>>>>> I am using Flink 1.12 and migrating my code from using
>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>
>>>>> FlinkKafkaConsumer has the method
>>>>>
>>>>> /**
>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>> to Kafka on checkpoints.
>>>>>>  * This setting will only have effect if checkpointing is enabled for
>>>>>> the job. If checkpointing isn't
>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>  * settings will be used.
>>>>>> */
>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>
>>>>>
>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>> offsets on checkpoints"?
>>>>>
>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>> Flink 1.14 documentation says little about it.
>>>>>
>>>>>  For example, the Flink 1.14 documentation states:
>>>>>
>>>>> Additional Properties
>>>>>> In addition to properties described above, you can set arbitrary
>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>>>> following options for configuration:
>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>> offsets to Kafka brokers on checkpoint
>>>>>
>>>>>
>>>>> And the 1.12 documentation states:
>>>>>
>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>>>> offsets that were stored in the checkpoint.
>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>>>>> in the job.
>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>> commit the offsets to Zookeeper.
>>>>>
>>>>>
>>>>> Thank you.
>>>>>
>>>>> Marco
>>>>>
>>>>>
>>>>>

Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

Posted by Hang Ruan <ru...@gmail.com>.
Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos <mv...@kineteque.com> 于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan <ru...@gmail.com> wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos <mv...@kineteque.com> 于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos <mv...@kineteque.com> 于2021年11月30日周二 上午11:08写道:
>>>
>>>> Thank you for the information.  That still does not answer my question
>>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>>
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>>> method.
>>>>
>>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>>> default behavior with checkpoints?
>>>>
>>>> -Marco
>>>>
>>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <ts...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Flink 1.14 release note states about this. See [1].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>>
>>>>> Marco Villalobos <mv...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>>>>>
>>>>>> Hi everybody,
>>>>>>
>>>>>> I am using Flink 1.12 and migrating my code from using
>>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>>
>>>>>> FlinkKafkaConsumer has the method
>>>>>>
>>>>>> /**
>>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>>> to Kafka on checkpoints.
>>>>>>>  * This setting will only have effect if checkpointing is enabled
>>>>>>> for the job. If checkpointing isn't
>>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>>  * settings will be used.
>>>>>>> */
>>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>>
>>>>>>
>>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>>> offsets on checkpoints"?
>>>>>>
>>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>>> Flink 1.14 documentation says little about it.
>>>>>>
>>>>>>  For example, the Flink 1.14 documentation states:
>>>>>>
>>>>>> Additional Properties
>>>>>>> In addition to properties described above, you can set arbitrary
>>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>>>>>> following options for configuration:
>>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>>> offsets to Kafka brokers on checkpoint
>>>>>>
>>>>>>
>>>>>> And the 1.12 documentation states:
>>>>>>
>>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>>> latest checkpoint and re-consume the records from Kafka, starting from the
>>>>>>> offsets that were stored in the checkpoint.
>>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>>>>>> in the job.
>>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>>> commit the offsets to Zookeeper.
>>>>>>
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> Marco
>>>>>>
>>>>>>
>>>>>>
>

Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

Posted by Marco Villalobos <mv...@kineteque.com>.
Thank you. One last question.  What is an RP? Where can I read it?

Marco

> On Nov 30, 2021, at 11:06 PM, Hang Ruan <ru...@gmail.com> wrote:
> 
> Hi,
> 
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is open is the default behavior in KafkaSourceBuilder. And it can not be changed in KafkaSourceBuilder. 
> 
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, we could change the behavior. This problem will be fixed in 1.12.6. It seems not to be contained in your version.  
> 
> Reading the RP will be helpful for you to understand the behavior.
>  
> 
> Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 于2021年12月1日周三 上午3:43写道:
> Thanks! 
> 
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does not exist in Flink 1.12.
> 
> Is that property supported with the string "commit.offsets.on.checkpoints"?
> 
> How do I configure that behavior so that offsets get committed on checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default behavior with checkpoints?
> 
> 
> 
> 
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ruanhang1993@gmail.com <ma...@gmail.com>> wrote:
> Hi, 
> 
> Maybe you can write like this : builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "true");
> 
> Other additional properties could be found here : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties>
> Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 于2021年11月30日周二 上午11:08写道:
> Thank you for the information.  That still does not answer my question though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so that consumer should commit offsets back to Kafka on checkpoints?
> 
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. 
> 
> But now that I am using KafkaSourceBuilder, how do I configure that behavior so that offsets get committed on checkpoints?  Or is that the default behavior with checkpoints?
> 
> -Marco
> 
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreaper96@gmail.com <ma...@gmail.com>> wrote:
> Hi!
> 
> Flink 1.14 release note states about this. See [1].
> 
> [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer <https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer>
> Marco Villalobos <mvillalobos@kineteque.com <ma...@kineteque.com>> 于2021年11月30日周二 上午7:12写道:
> Hi everybody,
> 
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to using the KafkaSourceBuilder.
> 
> FlinkKafkaConsumer has the method 
> 
> /**
>  * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
> 
> How do I setup that parameter when using the KafkaSourceBuilder? If I already have checkpointing configured, is it necessary to setup "commit offsets on checkpoints"?
> 
> The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 documentation says little about it.
> 
>  For example, the Flink 1.14 documentation states:
> 
> Additional Properties
> In addition to properties described above, you can set arbitrary properties for KafkaSource and KafkaConsumer by using setProperties(Properties) and setProperty(String, String). KafkaSource has following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets to Kafka brokers on checkpoint
> 
> And the 1.12 documentation states:
> 
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure. To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled in the job.
> If checkpointing is disabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
> 
> Thank you.
> 
> Marco
> 
>