You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by liangzai <li...@163.com> on 2022/06/19 02:36:19 UTC

Re: New KafkaSource API: Change in default behavior regarding starting offset

请问这个邮件咋退订?



---- Replied Message ----
| From | bastien dine<ba...@gmail.com> |
| Date | 06/15/2022 17:50 |
| To | Martijn Visser<ma...@apache.org> |
| Cc | Jing Ge<ji...@ververica.com>,
user <us...@flink.apache.org> |
| Subject | Re: New KafkaSource API : Change in default behavior regarding starting offset |
Hello Martijn,


Thanks for the link to the release note, especially : 
"When resuming from the savepoint, please use setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom KafkaSource builder to be sure we do not read all the previous data (earliest)


What bother me is just this change in starting offset default behavior from FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read again from kafka committed offset, but maybe nodoby does that ^^


Anyway thanks for the focus on the release note ! 


Best Regards,


------------------


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io





Le mer. 15 juin 2022 à 10:58, Martijn Visser <ma...@apache.org> a écrit :

Hi Bastien,


When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes included the instruction how to migrate from FlinkKafkaConsumer to KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a section on how to upgrade to the latest connector version that I think is outdated. I'm leaning towards copying the migration instructions to the generic documentation. Do you think that would have sufficed? 


Best regards,


Martijn


[1] https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version


Op wo 15 jun. 2022 om 09:22 schreef bastien dine <ba...@gmail.com>:

Hello jing,


This was the previous method in old Kafka consumer API, it has been removed in 1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing something






Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io

   


Le mar. 14 juin 2022 à 23:08, Jing Ge <ji...@ververica.com> a écrit :

Hi Bastien,


Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within Flink in the master branch. Could you please point out the code that committed offset is used as default?   


W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no committed offset, which is useful if the user is intended to read from the committed offset but something is wrong. It might feel weird if it is used as default, because an exception will be thrown when users start new jobs with default settings.


Best regards,
Jing


On Tue, Jun 14, 2022 at 4:15 PM bastien dine <ba...@gmail.com> wrote:

Hello everyone,


Does someone know why the starting offset behaviour has changed in the new Kafka Source ? 


This is now from earliest (code in KafkaSourceBuilder), doc says : 
"If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default." from : https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset


Before in old FlinkKafkaConsumer it was from committed offset (i.e : setStartFromGroupOffsets() method)


which match with this behaviour in new KafkaSource :   : OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST


This change can lead to big troubles if user pay no attention to this point when migrating from old KafkaConsumer to new KafkaSource,


Regards,
Bastien


------------------


Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io

Re: 退订/unsubscribe

Posted by Jing Ge <ji...@ververica.com>.
退订请发送任意消息至user-unsubscribe@flink.apache.org
In order to unsubscribe, please send an email to
user-unsubscribe@flink.apache.org

Thanks

Best regards,
Jing


From: liangzai <li...@163.com>
Date: Sun, Jun 19, 2022 at 4:37 AM
Subject: Re: New KafkaSource API: Change in default behavior regarding
starting offset
To: bastien dine <ba...@gmail.com>
Cc: Martijn Visser <ma...@apache.org>, Jing Ge <ji...@ververica.com>,
user <us...@flink.apache.org>


请问这个邮件咋退订?


---- Replied Message ----
From bastien dine<ba...@gmail.com> <ba...@gmail.com>
Date 06/15/2022 17:50
To Martijn Visser<ma...@apache.org> <ma...@apache.org>
Cc Jing Ge<ji...@ververica.com> <ji...@ververica.com>,
user <us...@flink.apache.org> <us...@flink.apache.org>
Subject Re: New KafkaSource API : Change in default behavior regarding
starting offset
Hello Martijn,

Thanks for the link to the release note, especially :
"When resuming from the savepoint, please use
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom
KafkaSource builder to be sure we do not read all the previous data
(earliest)

What bother me is just this change in starting offset default behavior from
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read
again from kafka committed offset, but maybe nodoby does that ^^

Anyway thanks for the focus on the release note !

Best Regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 15 juin 2022 à 10:58, Martijn Visser <ma...@apache.org> a
écrit :

> Hi Bastien,
>
> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
> included the instruction how to migrate from FlinkKafkaConsumer to
> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
> section on how to upgrade to the latest connector version that I think is
> outdated. I'm leaning towards copying the migration instructions to the
> generic documentation. Do you think that would have sufficed?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>
> Op wo 15 jun. 2022 om 09:22 schreef bastien dine <ba...@gmail.com>:
>
>> Hello jing,
>>
>> This was the previous method in old Kafka consumer API, it has been
>> removed in 1.15, so source code is not in master anymore,
>> Yes I know for the new Offset initializer, committed offset + earliest as
>> fallback can be used to have the same behavior as before
>> I just wanted to know whether this is a changed behavior or I am missing
>> something
>>
>>
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>> Le mar. 14 juin 2022 à 23:08, Jing Ge <ji...@ververica.com> a écrit :
>>
>>> Hi Bastien,
>>>
>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
>>> Flink in the master branch. Could you please point out the code that
>>> committed offset is used as default?
>>>
>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>> is used, an exception will be thrown at runtime in case there is no
>>> committed offset, which is useful if the user is intended to read from the
>>> committed offset but something is wrong. It might feel weird if it is used
>>> as default, because an exception will be thrown when users start new jobs
>>> with default settings.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine <ba...@gmail.com>
>>> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> Does someone know why the starting offset behaviour has changed in the
>>>> new Kafka Source ?
>>>>
>>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>>>> be used by default." from :
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>
>>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : setStartFromGroupOffsets()
>>>> method)
>>>>
>>>> which match with this behaviour in new KafkaSource :   :
>>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>>
>>>> This change can lead to big troubles if user pay no attention to this
>>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>>
>>>> Regards,
>>>> Bastien
>>>>
>>>> ------------------
>>>>
>>>> Bastien DINE
>>>> Data Architect / Software Engineer / Sysadmin
>>>> bastiendine.io
>>>>
>>>

Re: New KafkaSource API: Change in default behavior regarding starting offset

Posted by Shengkai Fang <fs...@gmail.com>.
hi.

Please use English in the user mail list. If you want to unsubscribe the
mail list, you can send mail to  user-unsubscribe@flink.apache.org
<us...@flink.apache.org> .

Best,
Shengkai

liangzai <li...@163.com> 于2022年6月19日周日 10:36写道:

> 请问这个邮件咋退订?
>
>
> ---- Replied Message ----
> From bastien dine<ba...@gmail.com> <ba...@gmail.com>
> Date 06/15/2022 17:50
> To Martijn Visser<ma...@apache.org> <ma...@apache.org>
> Cc Jing Ge<ji...@ververica.com> <ji...@ververica.com>,
> user <us...@flink.apache.org> <us...@flink.apache.org>
> Subject Re: New KafkaSource API : Change in default behavior regarding
> starting offset
> Hello Martijn,
>
> Thanks for the link to the release note, especially :
> "When resuming from the savepoint, please use
> setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
> KafkaSourceBuilder to transfer the offsets to the new source."
> So earliest is the new default
> We use for sure  .committedOffsets - we have it by default in our custom
> KafkaSource builder to be sure we do not read all the previous data
> (earliest)
>
> What bother me is just this change in starting offset default behavior
> from FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
> In fact it happens that we drop some of our kafka source state to read
> again from kafka committed offset, but maybe nodoby does that ^^
>
> Anyway thanks for the focus on the release note !
>
> Best Regards,
>
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le mer. 15 juin 2022 à 10:58, Martijn Visser <ma...@apache.org> a
> écrit :
>
>> Hi Bastien,
>>
>> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
>> included the instruction how to migrate from FlinkKafkaConsumer to
>> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
>> section on how to upgrade to the latest connector version that I think is
>> outdated. I'm leaning towards copying the migration instructions to the
>> generic documentation. Do you think that would have sufficed?
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>>
>> Op wo 15 jun. 2022 om 09:22 schreef bastien dine <bastien.dine@gmail.com
>> >:
>>
>>> Hello jing,
>>>
>>> This was the previous method in old Kafka consumer API, it has been
>>> removed in 1.15, so source code is not in master anymore,
>>> Yes I know for the new Offset initializer, committed offset + earliest
>>> as fallback can be used to have the same behavior as before
>>> I just wanted to know whether this is a changed behavior or I am missing
>>> something
>>>
>>>
>>>
>>> Bastien DINE
>>> Freelance
>>> Data Architect / Software Engineer / Sysadmin
>>> http://bastiendine.io
>>>
>>>
>>>
>>> Le mar. 14 juin 2022 à 23:08, Jing Ge <ji...@ververica.com> a écrit :
>>>
>>>> Hi Bastien,
>>>>
>>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
>>>> Flink in the master branch. Could you please point out the code that
>>>> committed offset is used as default?
>>>>
>>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>>> is used, an exception will be thrown at runtime in case there is no
>>>> committed offset, which is useful if the user is intended to read from the
>>>> committed offset but something is wrong. It might feel weird if it is used
>>>> as default, because an exception will be thrown when users start new jobs
>>>> with default settings.
>>>>
>>>> Best regards,
>>>> Jing
>>>>
>>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine <ba...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> Does someone know why the starting offset behaviour has changed in the
>>>>> new Kafka Source ?
>>>>>
>>>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>>>> "If offsets initializer is not specified,
>>>>> OffsetsInitializer.earliest() will be used by default." from :
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>>>
>>>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : setStartFromGroupOffsets()
>>>>> method)
>>>>>
>>>>> which match with this behaviour in new KafkaSource :   :
>>>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>>>
>>>>> This change can lead to big troubles if user pay no attention to this
>>>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>>>
>>>>> Regards,
>>>>> Bastien
>>>>>
>>>>> ------------------
>>>>>
>>>>> Bastien DINE
>>>>> Data Architect / Software Engineer / Sysadmin
>>>>> bastiendine.io
>>>>>
>>>>