You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Hongshun Wang <lo...@gmail.com> on 2023/04/12 06:27:18 UTC

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Hi everyone,

I have already modified FLIP-288 to provide a
newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
KafkaSourceEnumerator. Users can use
KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
partitions.

Surely, enabling the partition discovery strategy by default and modifying
the offset strategy for new partitions should be brought to the user's
attention. Therefore, it will be explained in the 1.18 release notes.

WDYT?CC, Ruan, Shammon, Gordon and Leonard.


Best,

Hongshun

On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <lo...@gmail.com>
wrote:

> Hi everyone,
> Thanks for your participation.
>
> @Gordon, I looked at the several questions you raised:
>
>    1. Should we use the firstDiscovery flag or two separate
>    OffsetsInitializers? Actually, I have considered later. If we follow
>    my initial idea, we can provide a default earliest OffsetsInitializer
>    for a new partition. However, According to @Shammon's suggestion, different
>    startup OffsetsInitializers correspond to different post-startup
>    OffsetsInitializers for Flink's built-in offset strategies.
>    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
>    again, and it seems that neither @Shammon nor I have figured out .
>    TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
>    get the current end offsets of the partitions. This is going to be used in
>    case we cannot find a suitable offset based on the timestamp, i.e., the
>    message meeting the requirement of the timestamp has not been produced to
>    Kafka yet. *In this case, we just use the latest offset*." Therefore,
>    using the TimestampOffsetsInitializer will always have an offset at
>    startup.
>    3. Clarification on coupling SPECIFIC-OFFSET startup with
>    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
>    "auto.offset.reset" position for partitions that are not hit.
>
> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> whether the offset specified at the beginning includes non-exist
> partitions. The previous design may have SPECIFIC-OFFSET startup with
> future partition. However, I think since different strategies have been
> used for the first discovered partition and the later discovered partition,
> the specified offset at startup should be the partitions that have been
> confirmed to exist, if not an error will be thrown. If partitions still not
> exist, it should be specified in the post-startup OffsetsInitializers
> (default EARLIEST).
>
> Best
> Hongshun
>
>
> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com> wrote:
>
>> Thanks Gordon and Leonard
>>
>> I'm sorry that there is no specific case from my side, but I consider the
>> issue as follows
>>
>> 1. Users may set an offset later than the current time because Flink does
>> not limit it
>> 2. If we use EARLIEST for a newly discovered partition with different
>> OFFSETs, which may be different from the previous strategy. I think it's
>> best to keep the same strategy as before if it does not cause data losing
>> 3. I think support different OFFSETs in the FLIP will not make the
>> implementation more complexity
>>
>> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and
>> Flink validate it. Then we can use the same strategy to apply to the newly
>> discovered partition, I think this will be nice too
>>
>> Best,
>> Shammon FY
>>
>>
>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com> wrote:
>>
>> > Thanks Hongshun and Shammon for driving the FLIP!
>> >
>> >
>> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
>> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
>> > SPECIFIC-OFFSET
>> > > post-startup*
>> >
>> > Grodan raised a good point about the future TIMESTAMP and
>> SPECIFIC-OFFSET,
>> > the timestamps/offset of the newly added partition is undetermined when
>> the
>> > job starts (the partition has not been created yet), and it is the
>> > timestamps/offset in the future.
>> >
>> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my past
>> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
>> used
>> > to specify existing timestamps/offset, which are used for business
>> > scenarios such as backfilling data and re-refreshing data. At present,
>> It's
>> > hard to imagine a user scenario specifying a future timestamp to filter
>> > data in the current topic of message queue system. Is it overthinking to
>> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
>> >
>> >
>> > Best,
>> > Leonard
>>
>

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by Hongshun Wang <lo...@gmail.com>.
Hi Shammon,

I agree with you. Since only EARLIEST is used, it's better not to mislead
users through the interface.


Yours

Hongshun

On Tue, Apr 18, 2023 at 7:12 PM Shammon FY <zj...@gmail.com> wrote:

> Hi Hongshun
>
> Thanks for your explanation, I have got your point. I review the FLIP again
> and only have one minor comment which won't block this FLIP: should we need
> in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor
> of `KafkaSourceEnumerator`?  I think we can remove it if we always use
> EARLIEST for new discovery partitions.
>
> Best,
> Shammon FY
>
> On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <lo...@gmail.com>
> wrote:
>
> > Hi Shammon,
> >
> > Thank you for your advice.I have carefully considered whether to show
> this
> > in SQL DDL. Therefore, I carefully studied whether it is feasible
> Recently
> >
> > However,  after reading the corresponding code more thoroughly, it
> appears
> > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> > work as we initially thought. Finally, I have decided to only use
> > "EARLIEST" instead of allowing the user to make a free choice.
> >
> > Now, let me show my new understanding.
> >
> > The actual work of SpecifiedOffsetsInitializer and
> > TimestampOffsetsInitializer:
> >
> >
> >    - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
> >    partitions while use *EARLIEST* for unspecified partitions. Specified
> >    partitions offset should be less than the latest offset, otherwise it
> > will
> >    start from the *EARLIEST*.
> >    - *TimestampOffsetsInitializer*: Initialize the offsets based on a
> >    timestamp. If the message meeting the requirement of the timestamp
> have
> > not
> >    been produced to Kafka yet, just use the *LATEST* offset.
> >
> > So, some problems will occur when new partition use
> > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> > more information in the "Rejected Alternatives" section of Flip-288,
> which
> > includes details of the code and process of deductive reasoning.
> > All these problems can be reproducible in the current version. The reason
> > why they haven't been exposed is probably because users usually set the
> > existing specified offset or timestamp, so it appears as earliest in
> > production.
> >
> > WDYT?
> > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> >
> > Yours
> >
> > Hongshun
> >
> >
> >
> >
> > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zj...@gmail.com> wrote:
> >
> > > Hi Hongshun
> > >
> > > Thanks for updating the FLIP, it totally sounds good to me.
> > >
> > > I just have one comment: How does sql job set new discovery offsets
> > > initializer?
> > > I found `DataStream` jobs can set different offsets initializers for
> new
> > > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> > SQL
> > > jobs need to support this feature?
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1024@gmail.com
> >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I have already modified FLIP-288 to provide a
> > > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > > > KafkaSourceEnumerator. Users can use
> > > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> > new
> > > > partitions.
> > > >
> > > > Surely, enabling the partition discovery strategy by default and
> > > modifying
> > > > the offset strategy for new partitions should be brought to the
> user's
> > > > attention. Therefore, it will be explained in the 1.18 release notes.
> > > >
> > > > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> > > >
> > > >
> > > > Best,
> > > >
> > > > Hongshun
> > > >
> > > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <
> loserwang1024@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > > Thanks for your participation.
> > > > >
> > > > > @Gordon, I looked at the several questions you raised:
> > > > >
> > > > >    1. Should we use the firstDiscovery flag or two separate
> > > > >    OffsetsInitializers? Actually, I have considered later. If we
> > follow
> > > > >    my initial idea, we can provide a default earliest
> > > OffsetsInitializer
> > > > >    for a new partition. However, According to @Shammon's
> suggestion,
> > > > different
> > > > >    startup OffsetsInitializers correspond to different post-startup
> > > > >    OffsetsInitializers for Flink's built-in offset strategies.
> > > > >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the
> > code
> > > > >    again, and it seems that neither @Shammon nor I have figured
> out .
> > > > >    TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> > > "First
> > > > >    get the current end offsets of the partitions. This is going to
> be
> > > > used in
> > > > >    case we cannot find a suitable offset based on the timestamp,
> > i.e.,
> > > > the
> > > > >    message meeting the requirement of the timestamp has not been
> > > > produced to
> > > > >    Kafka yet. *In this case, we just use the latest offset*."
> > > Therefore,
> > > > >    using the TimestampOffsetsInitializer will always have an offset
> > at
> > > > >    startup.
> > > > >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > > >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already
> > uses
> > > > >    "auto.offset.reset" position for partitions that are not hit.
> > > > >
> > > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about
> is
> > > > > whether the offset specified at the beginning includes non-exist
> > > > > partitions. The previous design may have SPECIFIC-OFFSET startup
> with
> > > > > future partition. However, I think since different strategies have
> > been
> > > > > used for the first discovered partition and the later discovered
> > > > partition,
> > > > > the specified offset at startup should be the partitions that have
> > been
> > > > > confirmed to exist, if not an error will be thrown. If partitions
> > still
> > > > not
> > > > > exist, it should be specified in the post-startup
> OffsetsInitializers
> > > > > (default EARLIEST).
> > > > >
> > > > > Best
> > > > > Hongshun
> > > > >
> > > > >
> > > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com>
> > wrote:
> > > > >
> > > > >> Thanks Gordon and Leonard
> > > > >>
> > > > >> I'm sorry that there is no specific case from my side, but I
> > consider
> > > > the
> > > > >> issue as follows
> > > > >>
> > > > >> 1. Users may set an offset later than the current time because
> Flink
> > > > does
> > > > >> not limit it
> > > > >> 2. If we use EARLIEST for a newly discovered partition with
> > different
> > > > >> OFFSETs, which may be different from the previous strategy. I
> think
> > > it's
> > > > >> best to keep the same strategy as before if it does not cause data
> > > > losing
> > > > >> 3. I think support different OFFSETs in the FLIP will not make the
> > > > >> implementation more complexity
> > > > >>
> > > > >> Of course, if it is confirmed that this is an illegal Timestamp
> > OFFSET
> > > > and
> > > > >> Flink validate it. Then we can use the same strategy to apply to
> the
> > > > newly
> > > > >> discovered partition, I think this will be nice too
> > > > >>
> > > > >> Best,
> > > > >> Shammon FY
> > > > >>
> > > > >>
> > > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Thanks Hongshun and Shammon for driving the FLIP!
> > > > >> >
> > > > >> >
> > > > >> > > *2. Clarification on "future-time" TIMESTAMP
> OffsetsInitializer*
> > > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > > >> > SPECIFIC-OFFSET
> > > > >> > > post-startup*
> > > > >> >
> > > > >> > Grodan raised a good point about the future TIMESTAMP and
> > > > >> SPECIFIC-OFFSET,
> > > > >> > the timestamps/offset of the newly added partition is
> undetermined
> > > > when
> > > > >> the
> > > > >> > job starts (the partition has not been created yet), and it is
> the
> > > > >> > timestamps/offset in the future.
> > > > >> >
> > > > >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In
> my
> > > > past
> > > > >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are
> > usually
> > > > >> used
> > > > >> > to specify existing timestamps/offset, which are used for
> business
> > > > >> > scenarios such as backfilling data and re-refreshing data. At
> > > present,
> > > > >> It's
> > > > >> > hard to imagine a user scenario specifying a future timestamp to
> > > > filter
> > > > >> > data in the current topic of message queue system. Is it
> > > overthinking
> > > > to
> > > > >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> > > > >> >
> > > > >> >
> > > > >> > Best,
> > > > >> > Leonard
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by Shammon FY <zj...@gmail.com>.
Hi Hongshun

Thanks for your explanation, I have got your point. I review the FLIP again
and only have one minor comment which won't block this FLIP: should we need
in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor
of `KafkaSourceEnumerator`?  I think we can remove it if we always use
EARLIEST for new discovery partitions.

Best,
Shammon FY

On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <lo...@gmail.com>
wrote:

> Hi Shammon,
>
> Thank you for your advice.I have carefully considered whether to show this
> in SQL DDL. Therefore, I carefully studied whether it is feasible Recently
>
> However,  after reading the corresponding code more thoroughly, it appears
> that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> work as we initially thought. Finally, I have decided to only use
> "EARLIEST" instead of allowing the user to make a free choice.
>
> Now, let me show my new understanding.
>
> The actual work of SpecifiedOffsetsInitializer and
> TimestampOffsetsInitializer:
>
>
>    - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
>    partitions while use *EARLIEST* for unspecified partitions. Specified
>    partitions offset should be less than the latest offset, otherwise it
> will
>    start from the *EARLIEST*.
>    - *TimestampOffsetsInitializer*: Initialize the offsets based on a
>    timestamp. If the message meeting the requirement of the timestamp have
> not
>    been produced to Kafka yet, just use the *LATEST* offset.
>
> So, some problems will occur when new partition use
> SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> more information in the "Rejected Alternatives" section of Flip-288, which
> includes details of the code and process of deductive reasoning.
> All these problems can be reproducible in the current version. The reason
> why they haven't been exposed is probably because users usually set the
> existing specified offset or timestamp, so it appears as earliest in
> production.
>
> WDYT?
> CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
>
> Yours
>
> Hongshun
>
>
>
>
> On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zj...@gmail.com> wrote:
>
> > Hi Hongshun
> >
> > Thanks for updating the FLIP, it totally sounds good to me.
> >
> > I just have one comment: How does sql job set new discovery offsets
> > initializer?
> > I found `DataStream` jobs can set different offsets initializers for new
> > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> SQL
> > jobs need to support this feature?
> >
> > Best,
> > Shammon FY
> >
> > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <lo...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I have already modified FLIP-288 to provide a
> > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > > KafkaSourceEnumerator. Users can use
> > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> new
> > > partitions.
> > >
> > > Surely, enabling the partition discovery strategy by default and
> > modifying
> > > the offset strategy for new partitions should be brought to the user's
> > > attention. Therefore, it will be explained in the 1.18 release notes.
> > >
> > > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> > >
> > >
> > > Best,
> > >
> > > Hongshun
> > >
> > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1024@gmail.com
> >
> > > wrote:
> > >
> > > > Hi everyone,
> > > > Thanks for your participation.
> > > >
> > > > @Gordon, I looked at the several questions you raised:
> > > >
> > > >    1. Should we use the firstDiscovery flag or two separate
> > > >    OffsetsInitializers? Actually, I have considered later. If we
> follow
> > > >    my initial idea, we can provide a default earliest
> > OffsetsInitializer
> > > >    for a new partition. However, According to @Shammon's suggestion,
> > > different
> > > >    startup OffsetsInitializers correspond to different post-startup
> > > >    OffsetsInitializers for Flink's built-in offset strategies.
> > > >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the
> code
> > > >    again, and it seems that neither @Shammon nor I have figured out .
> > > >    TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> > "First
> > > >    get the current end offsets of the partitions. This is going to be
> > > used in
> > > >    case we cannot find a suitable offset based on the timestamp,
> i.e.,
> > > the
> > > >    message meeting the requirement of the timestamp has not been
> > > produced to
> > > >    Kafka yet. *In this case, we just use the latest offset*."
> > Therefore,
> > > >    using the TimestampOffsetsInitializer will always have an offset
> at
> > > >    startup.
> > > >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already
> uses
> > > >    "auto.offset.reset" position for partitions that are not hit.
> > > >
> > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > > > whether the offset specified at the beginning includes non-exist
> > > > partitions. The previous design may have SPECIFIC-OFFSET startup with
> > > > future partition. However, I think since different strategies have
> been
> > > > used for the first discovered partition and the later discovered
> > > partition,
> > > > the specified offset at startup should be the partitions that have
> been
> > > > confirmed to exist, if not an error will be thrown. If partitions
> still
> > > not
> > > > exist, it should be specified in the post-startup OffsetsInitializers
> > > > (default EARLIEST).
> > > >
> > > > Best
> > > > Hongshun
> > > >
> > > >
> > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com>
> wrote:
> > > >
> > > >> Thanks Gordon and Leonard
> > > >>
> > > >> I'm sorry that there is no specific case from my side, but I
> consider
> > > the
> > > >> issue as follows
> > > >>
> > > >> 1. Users may set an offset later than the current time because Flink
> > > does
> > > >> not limit it
> > > >> 2. If we use EARLIEST for a newly discovered partition with
> different
> > > >> OFFSETs, which may be different from the previous strategy. I think
> > it's
> > > >> best to keep the same strategy as before if it does not cause data
> > > losing
> > > >> 3. I think support different OFFSETs in the FLIP will not make the
> > > >> implementation more complexity
> > > >>
> > > >> Of course, if it is confirmed that this is an illegal Timestamp
> OFFSET
> > > and
> > > >> Flink validate it. Then we can use the same strategy to apply to the
> > > newly
> > > >> discovered partition, I think this will be nice too
> > > >>
> > > >> Best,
> > > >> Shammon FY
> > > >>
> > > >>
> > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com>
> > wrote:
> > > >>
> > > >> > Thanks Hongshun and Shammon for driving the FLIP!
> > > >> >
> > > >> >
> > > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > >> > SPECIFIC-OFFSET
> > > >> > > post-startup*
> > > >> >
> > > >> > Grodan raised a good point about the future TIMESTAMP and
> > > >> SPECIFIC-OFFSET,
> > > >> > the timestamps/offset of the newly added partition is undetermined
> > > when
> > > >> the
> > > >> > job starts (the partition has not been created yet), and it is the
> > > >> > timestamps/offset in the future.
> > > >> >
> > > >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> > > past
> > > >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are
> usually
> > > >> used
> > > >> > to specify existing timestamps/offset, which are used for business
> > > >> > scenarios such as backfilling data and re-refreshing data. At
> > present,
> > > >> It's
> > > >> > hard to imagine a user scenario specifying a future timestamp to
> > > filter
> > > >> > data in the current topic of message queue system. Is it
> > overthinking
> > > to
> > > >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Leonard
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
> I have already modified FLIP-288 to provide a
newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
KafkaSourceEnumerator. Users can use
KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
partitions.

Thanks for addressing my comment Hongshun.

> Considering these reasons and facts, I’m +1 to only use EARLIEST for  new
discovered partitions.

Sounds good to me.


Overall, +1 to this proposal in principle (I'll formally vote on the vote
thread as well).

Thanks,
Gordon

On Tue, Apr 18, 2023 at 9:12 PM Leonard Xu <xb...@gmail.com> wrote:

> Thanks Hongshun for deeper analysis of the existing KafkaSource
> implementation details, Cool!
> There’s no specific use case to use a future TIMESTAMP and SPECIFIC-OFFSET
> for new discovered partitions
>  The existing SpecifiedOffsetsInitializer will use the EARLIEST offset for
> unspecified partitions as well as new discovered partitions
>  The existing TimestampOffsetsInitializer will use the LATEST offset for
> future timestamp, the  LATEST offset is similar to  EARLIEST offset for new
> discovered partitions  in this case,  and EARLIEST is safer as it covers
> all records.
> Considering these reasons and facts, I’m +1 to only use EARLIEST for  new
> discovered partitions.
>
> The updated FLIP looks good to me, we can start a vote thread soon if
> there are no new divergences.
>
> Best,
> Leonard
>
> > On Apr 18, 2023, at 4:58 PM, Hongshun Wang <lo...@gmail.com>
> wrote:
> >
> > Hi Shammon,
> >
> > Thank you for your advice.I have carefully considered whether to show
> this
> > in SQL DDL. Therefore, I carefully studied whether it is feasible
> Recently
> >
> > However,  after reading the corresponding code more thoroughly, it
> appears
> > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> > work as we initially thought. Finally, I have decided to only use
> > "EARLIEST" instead of allowing the user to make a free choice.
> >
> > Now, let me show my new understanding.
> >
> > The actual work of SpecifiedOffsetsInitializer and
> > TimestampOffsetsInitializer:
> >
> >
> >   - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
> >   partitions while use *EARLIEST* for unspecified partitions. Specified
> >   partitions offset should be less than the latest offset, otherwise it
> will
> >   start from the *EARLIEST*.
> >   - *TimestampOffsetsInitializer*: Initialize the offsets based on a
> >   timestamp. If the message meeting the requirement of the timestamp
> have not
> >   been produced to Kafka yet, just use the *LATEST* offset.
> >
> > So, some problems will occur when new partition use
> > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> > more information in the "Rejected Alternatives" section of Flip-288,
> which
> > includes details of the code and process of deductive reasoning.
> > All these problems can be reproducible in the current version. The reason
> > why they haven't been exposed is probably because users usually set the
> > existing specified offset or timestamp, so it appears as earliest in
> > production.
> >
> > WDYT?
> > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> >
> > Yours
> >
> > Hongshun
> >
> >
> >
> >
> > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zj...@gmail.com> wrote:
> >
> >> Hi Hongshun
> >>
> >> Thanks for updating the FLIP, it totally sounds good to me.
> >>
> >> I just have one comment: How does sql job set new discovery offsets
> >> initializer?
> >> I found `DataStream` jobs can set different offsets initializers for new
> >> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> SQL
> >> jobs need to support this feature?
> >>
> >> Best,
> >> Shammon FY
> >>
> >> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <lo...@gmail.com>
> >> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> I have already modified FLIP-288 to provide a
> >>> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> >>> KafkaSourceEnumerator. Users can use
> >>> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> new
> >>> partitions.
> >>>
> >>> Surely, enabling the partition discovery strategy by default and
> >> modifying
> >>> the offset strategy for new partitions should be brought to the user's
> >>> attention. Therefore, it will be explained in the 1.18 release notes.
> >>>
> >>> WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> >>>
> >>>
> >>> Best,
> >>>
> >>> Hongshun
> >>>
> >>> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1024@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi everyone,
> >>>> Thanks for your participation.
> >>>>
> >>>> @Gordon, I looked at the several questions you raised:
> >>>>
> >>>>   1. Should we use the firstDiscovery flag or two separate
> >>>>   OffsetsInitializers? Actually, I have considered later. If we follow
> >>>>   my initial idea, we can provide a default earliest
> >> OffsetsInitializer
> >>>>   for a new partition. However, According to @Shammon's suggestion,
> >>> different
> >>>>   startup OffsetsInitializers correspond to different post-startup
> >>>>   OffsetsInitializers for Flink's built-in offset strategies.
> >>>>   2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> >>>>   again, and it seems that neither @Shammon nor I have figured out .
> >>>>   TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> >> "First
> >>>>   get the current end offsets of the partitions. This is going to be
> >>> used in
> >>>>   case we cannot find a suitable offset based on the timestamp, i.e.,
> >>> the
> >>>>   message meeting the requirement of the timestamp has not been
> >>> produced to
> >>>>   Kafka yet. *In this case, we just use the latest offset*."
> >> Therefore,
> >>>>   using the TimestampOffsetsInitializer will always have an offset at
> >>>>   startup.
> >>>>   3. Clarification on coupling SPECIFIC-OFFSET startup with
> >>>>   SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> >>>>   "auto.offset.reset" position for partitions that are not hit.
> >>>>
> >>>> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> >>>> whether the offset specified at the beginning includes non-exist
> >>>> partitions. The previous design may have SPECIFIC-OFFSET startup with
> >>>> future partition. However, I think since different strategies have
> been
> >>>> used for the first discovered partition and the later discovered
> >>> partition,
> >>>> the specified offset at startup should be the partitions that have
> been
> >>>> confirmed to exist, if not an error will be thrown. If partitions
> still
> >>> not
> >>>> exist, it should be specified in the post-startup OffsetsInitializers
> >>>> (default EARLIEST).
> >>>>
> >>>> Best
> >>>> Hongshun
> >>>>
> >>>>
> >>>> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com> wrote:
> >>>>
> >>>>> Thanks Gordon and Leonard
> >>>>>
> >>>>> I'm sorry that there is no specific case from my side, but I consider
> >>> the
> >>>>> issue as follows
> >>>>>
> >>>>> 1. Users may set an offset later than the current time because Flink
> >>> does
> >>>>> not limit it
> >>>>> 2. If we use EARLIEST for a newly discovered partition with different
> >>>>> OFFSETs, which may be different from the previous strategy. I think
> >> it's
> >>>>> best to keep the same strategy as before if it does not cause data
> >>> losing
> >>>>> 3. I think support different OFFSETs in the FLIP will not make the
> >>>>> implementation more complexity
> >>>>>
> >>>>> Of course, if it is confirmed that this is an illegal Timestamp
> OFFSET
> >>> and
> >>>>> Flink validate it. Then we can use the same strategy to apply to the
> >>> newly
> >>>>> discovered partition, I think this will be nice too
> >>>>>
> >>>>> Best,
> >>>>> Shammon FY
> >>>>>
> >>>>>
> >>>>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Thanks Hongshun and Shammon for driving the FLIP!
> >>>>>>
> >>>>>>
> >>>>>>> *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> >>>>>>> *3. Clarification on coupling SPECIFIC-OFFSET startup with
> >>>>>> SPECIFIC-OFFSET
> >>>>>>> post-startup*
> >>>>>>
> >>>>>> Grodan raised a good point about the future TIMESTAMP and
> >>>>> SPECIFIC-OFFSET,
> >>>>>> the timestamps/offset of the newly added partition is undetermined
> >>> when
> >>>>> the
> >>>>>> job starts (the partition has not been created yet), and it is the
> >>>>>> timestamps/offset in the future.
> >>>>>>
> >>>>>> I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> >>> past
> >>>>>> experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
> >>>>> used
> >>>>>> to specify existing timestamps/offset, which are used for business
> >>>>>> scenarios such as backfilling data and re-refreshing data. At
> >> present,
> >>>>> It's
> >>>>>> hard to imagine a user scenario specifying a future timestamp to
> >>> filter
> >>>>>> data in the current topic of message queue system. Is it
> >> overthinking
> >>> to
> >>>>>> consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Leonard
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by Leonard Xu <xb...@gmail.com>.
Thanks Hongshun for deeper analysis of the existing KafkaSource implementation details, Cool!
There’s no specific use case to use a future TIMESTAMP and SPECIFIC-OFFSET for new discovered partitions
 The existing SpecifiedOffsetsInitializer will use the EARLIEST offset for unspecified partitions as well as new discovered partitions
 The existing TimestampOffsetsInitializer will use the LATEST offset for future timestamp, the  LATEST offset is similar to  EARLIEST offset for new discovered partitions  in this case,  and EARLIEST is safer as it covers all records.
Considering these reasons and facts, I’m +1 to only use EARLIEST for  new discovered partitions.

The updated FLIP looks good to me, we can start a vote thread soon if there are no new divergences.

Best,
Leonard

> On Apr 18, 2023, at 4:58 PM, Hongshun Wang <lo...@gmail.com> wrote:
> 
> Hi Shammon,
> 
> Thank you for your advice.I have carefully considered whether to show this
> in SQL DDL. Therefore, I carefully studied whether it is feasible Recently
> 
> However,  after reading the corresponding code more thoroughly, it appears
> that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> work as we initially thought. Finally, I have decided to only use
> "EARLIEST" instead of allowing the user to make a free choice.
> 
> Now, let me show my new understanding.
> 
> The actual work of SpecifiedOffsetsInitializer and
> TimestampOffsetsInitializer:
> 
> 
>   - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
>   partitions while use *EARLIEST* for unspecified partitions. Specified
>   partitions offset should be less than the latest offset, otherwise it will
>   start from the *EARLIEST*.
>   - *TimestampOffsetsInitializer*: Initialize the offsets based on a
>   timestamp. If the message meeting the requirement of the timestamp have not
>   been produced to Kafka yet, just use the *LATEST* offset.
> 
> So, some problems will occur when new partition use
> SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> more information in the "Rejected Alternatives" section of Flip-288, which
> includes details of the code and process of deductive reasoning.
> All these problems can be reproducible in the current version. The reason
> why they haven't been exposed is probably because users usually set the
> existing specified offset or timestamp, so it appears as earliest in
> production.
> 
> WDYT?
> CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> 
> Yours
> 
> Hongshun
> 
> 
> 
> 
> On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zj...@gmail.com> wrote:
> 
>> Hi Hongshun
>> 
>> Thanks for updating the FLIP, it totally sounds good to me.
>> 
>> I just have one comment: How does sql job set new discovery offsets
>> initializer?
>> I found `DataStream` jobs can set different offsets initializers for new
>> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL
>> jobs need to support this feature?
>> 
>> Best,
>> Shammon FY
>> 
>> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <lo...@gmail.com>
>> wrote:
>> 
>>> Hi everyone,
>>> 
>>> I have already modified FLIP-288 to provide a
>>> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
>>> KafkaSourceEnumerator. Users can use
>>> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
>>> partitions.
>>> 
>>> Surely, enabling the partition discovery strategy by default and
>> modifying
>>> the offset strategy for new partitions should be brought to the user's
>>> attention. Therefore, it will be explained in the 1.18 release notes.
>>> 
>>> WDYT?CC, Ruan, Shammon, Gordon and Leonard.
>>> 
>>> 
>>> Best,
>>> 
>>> Hongshun
>>> 
>>> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <lo...@gmail.com>
>>> wrote:
>>> 
>>>> Hi everyone,
>>>> Thanks for your participation.
>>>> 
>>>> @Gordon, I looked at the several questions you raised:
>>>> 
>>>>   1. Should we use the firstDiscovery flag or two separate
>>>>   OffsetsInitializers? Actually, I have considered later. If we follow
>>>>   my initial idea, we can provide a default earliest
>> OffsetsInitializer
>>>>   for a new partition. However, According to @Shammon's suggestion,
>>> different
>>>>   startup OffsetsInitializers correspond to different post-startup
>>>>   OffsetsInitializers for Flink's built-in offset strategies.
>>>>   2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
>>>>   again, and it seems that neither @Shammon nor I have figured out .
>>>>   TimestampOffsetsInitializer#getPartitionOffsets has a comment:
>> "First
>>>>   get the current end offsets of the partitions. This is going to be
>>> used in
>>>>   case we cannot find a suitable offset based on the timestamp, i.e.,
>>> the
>>>>   message meeting the requirement of the timestamp has not been
>>> produced to
>>>>   Kafka yet. *In this case, we just use the latest offset*."
>> Therefore,
>>>>   using the TimestampOffsetsInitializer will always have an offset at
>>>>   startup.
>>>>   3. Clarification on coupling SPECIFIC-OFFSET startup with
>>>>   SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
>>>>   "auto.offset.reset" position for partitions that are not hit.
>>>> 
>>>> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
>>>> whether the offset specified at the beginning includes non-exist
>>>> partitions. The previous design may have SPECIFIC-OFFSET startup with
>>>> future partition. However, I think since different strategies have been
>>>> used for the first discovered partition and the later discovered
>>> partition,
>>>> the specified offset at startup should be the partitions that have been
>>>> confirmed to exist, if not an error will be thrown. If partitions still
>>> not
>>>> exist, it should be specified in the post-startup OffsetsInitializers
>>>> (default EARLIEST).
>>>> 
>>>> Best
>>>> Hongshun
>>>> 
>>>> 
>>>> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com> wrote:
>>>> 
>>>>> Thanks Gordon and Leonard
>>>>> 
>>>>> I'm sorry that there is no specific case from my side, but I consider
>>> the
>>>>> issue as follows
>>>>> 
>>>>> 1. Users may set an offset later than the current time because Flink
>>> does
>>>>> not limit it
>>>>> 2. If we use EARLIEST for a newly discovered partition with different
>>>>> OFFSETs, which may be different from the previous strategy. I think
>> it's
>>>>> best to keep the same strategy as before if it does not cause data
>>> losing
>>>>> 3. I think support different OFFSETs in the FLIP will not make the
>>>>> implementation more complexity
>>>>> 
>>>>> Of course, if it is confirmed that this is an illegal Timestamp OFFSET
>>> and
>>>>> Flink validate it. Then we can use the same strategy to apply to the
>>> newly
>>>>> discovered partition, I think this will be nice too
>>>>> 
>>>>> Best,
>>>>> Shammon FY
>>>>> 
>>>>> 
>>>>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com>
>> wrote:
>>>>> 
>>>>>> Thanks Hongshun and Shammon for driving the FLIP!
>>>>>> 
>>>>>> 
>>>>>>> *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
>>>>>>> *3. Clarification on coupling SPECIFIC-OFFSET startup with
>>>>>> SPECIFIC-OFFSET
>>>>>>> post-startup*
>>>>>> 
>>>>>> Grodan raised a good point about the future TIMESTAMP and
>>>>> SPECIFIC-OFFSET,
>>>>>> the timestamps/offset of the newly added partition is undetermined
>>> when
>>>>> the
>>>>>> job starts (the partition has not been created yet), and it is the
>>>>>> timestamps/offset in the future.
>>>>>> 
>>>>>> I used many message queue systems like Kafka, Pulsar, xxMQ. In my
>>> past
>>>>>> experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
>>>>> used
>>>>>> to specify existing timestamps/offset, which are used for business
>>>>>> scenarios such as backfilling data and re-refreshing data. At
>> present,
>>>>> It's
>>>>>> hard to imagine a user scenario specifying a future timestamp to
>>> filter
>>>>>> data in the current topic of message queue system. Is it
>> overthinking
>>> to
>>>>>> consider future  future TIMESTAMP and SPECIFIC-OFFSET?
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Leonard
>>>>> 
>>>> 
>>> 
>> 


Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by Hongshun Wang <lo...@gmail.com>.
Hi Shammon,

Thank you for your advice.I have carefully considered whether to show this
in SQL DDL. Therefore, I carefully studied whether it is feasible Recently

However,  after reading the corresponding code more thoroughly, it appears
that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
work as we initially thought. Finally, I have decided to only use
"EARLIEST" instead of allowing the user to make a free choice.

Now, let me show my new understanding.

The actual work of SpecifiedOffsetsInitializer and
TimestampOffsetsInitializer:


   - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
   partitions while use *EARLIEST* for unspecified partitions. Specified
   partitions offset should be less than the latest offset, otherwise it will
   start from the *EARLIEST*.
   - *TimestampOffsetsInitializer*: Initialize the offsets based on a
   timestamp. If the message meeting the requirement of the timestamp have not
   been produced to Kafka yet, just use the *LATEST* offset.

So, some problems will occur when new partition use
SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
more information in the "Rejected Alternatives" section of Flip-288, which
includes details of the code and process of deductive reasoning.
All these problems can be reproducible in the current version. The reason
why they haven't been exposed is probably because users usually set the
existing specified offset or timestamp, so it appears as earliest in
production.

WDYT?
CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.

Yours

Hongshun




On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zj...@gmail.com> wrote:

> Hi Hongshun
>
> Thanks for updating the FLIP, it totally sounds good to me.
>
> I just have one comment: How does sql job set new discovery offsets
> initializer?
> I found `DataStream` jobs can set different offsets initializers for new
> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL
> jobs need to support this feature?
>
> Best,
> Shammon FY
>
> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <lo...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > I have already modified FLIP-288 to provide a
> > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > KafkaSourceEnumerator. Users can use
> > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
> > partitions.
> >
> > Surely, enabling the partition discovery strategy by default and
> modifying
> > the offset strategy for new partitions should be brought to the user's
> > attention. Therefore, it will be explained in the 1.18 release notes.
> >
> > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> >
> >
> > Best,
> >
> > Hongshun
> >
> > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <lo...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > > Thanks for your participation.
> > >
> > > @Gordon, I looked at the several questions you raised:
> > >
> > >    1. Should we use the firstDiscovery flag or two separate
> > >    OffsetsInitializers? Actually, I have considered later. If we follow
> > >    my initial idea, we can provide a default earliest
> OffsetsInitializer
> > >    for a new partition. However, According to @Shammon's suggestion,
> > different
> > >    startup OffsetsInitializers correspond to different post-startup
> > >    OffsetsInitializers for Flink's built-in offset strategies.
> > >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> > >    again, and it seems that neither @Shammon nor I have figured out .
> > >    TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> "First
> > >    get the current end offsets of the partitions. This is going to be
> > used in
> > >    case we cannot find a suitable offset based on the timestamp, i.e.,
> > the
> > >    message meeting the requirement of the timestamp has not been
> > produced to
> > >    Kafka yet. *In this case, we just use the latest offset*."
> Therefore,
> > >    using the TimestampOffsetsInitializer will always have an offset at
> > >    startup.
> > >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> > >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> > >    "auto.offset.reset" position for partitions that are not hit.
> > >
> > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > > whether the offset specified at the beginning includes non-exist
> > > partitions. The previous design may have SPECIFIC-OFFSET startup with
> > > future partition. However, I think since different strategies have been
> > > used for the first discovered partition and the later discovered
> > partition,
> > > the specified offset at startup should be the partitions that have been
> > > confirmed to exist, if not an error will be thrown. If partitions still
> > not
> > > exist, it should be specified in the post-startup OffsetsInitializers
> > > (default EARLIEST).
> > >
> > > Best
> > > Hongshun
> > >
> > >
> > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com> wrote:
> > >
> > >> Thanks Gordon and Leonard
> > >>
> > >> I'm sorry that there is no specific case from my side, but I consider
> > the
> > >> issue as follows
> > >>
> > >> 1. Users may set an offset later than the current time because Flink
> > does
> > >> not limit it
> > >> 2. If we use EARLIEST for a newly discovered partition with different
> > >> OFFSETs, which may be different from the previous strategy. I think
> it's
> > >> best to keep the same strategy as before if it does not cause data
> > losing
> > >> 3. I think support different OFFSETs in the FLIP will not make the
> > >> implementation more complexity
> > >>
> > >> Of course, if it is confirmed that this is an illegal Timestamp OFFSET
> > and
> > >> Flink validate it. Then we can use the same strategy to apply to the
> > newly
> > >> discovered partition, I think this will be nice too
> > >>
> > >> Best,
> > >> Shammon FY
> > >>
> > >>
> > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com>
> wrote:
> > >>
> > >> > Thanks Hongshun and Shammon for driving the FLIP!
> > >> >
> > >> >
> > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > >> > SPECIFIC-OFFSET
> > >> > > post-startup*
> > >> >
> > >> > Grodan raised a good point about the future TIMESTAMP and
> > >> SPECIFIC-OFFSET,
> > >> > the timestamps/offset of the newly added partition is undetermined
> > when
> > >> the
> > >> > job starts (the partition has not been created yet), and it is the
> > >> > timestamps/offset in the future.
> > >> >
> > >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> > past
> > >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
> > >> used
> > >> > to specify existing timestamps/offset, which are used for business
> > >> > scenarios such as backfilling data and re-refreshing data. At
> present,
> > >> It's
> > >> > hard to imagine a user scenario specifying a future timestamp to
> > filter
> > >> > data in the current topic of message queue system. Is it
> overthinking
> > to
> > >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> > >> >
> > >> >
> > >> > Best,
> > >> > Leonard
> > >>
> > >
> >
>

Re: [DISCUSS] FLIP-288:Enable Dynamic Partition Discovery by Default in Kafka Source

Posted by Shammon FY <zj...@gmail.com>.
Hi Hongshun

Thanks for updating the FLIP, it totally sounds good to me.

I just have one comment: How does sql job set new discovery offsets
initializer?
I found `DataStream` jobs can set different offsets initializers for new
discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL
jobs need to support this feature?

Best,
Shammon FY

On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <lo...@gmail.com>
wrote:

> Hi everyone,
>
> I have already modified FLIP-288 to provide a
> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> KafkaSourceEnumerator. Users can use
> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
> partitions.
>
> Surely, enabling the partition discovery strategy by default and modifying
> the offset strategy for new partitions should be brought to the user's
> attention. Therefore, it will be explained in the 1.18 release notes.
>
> WDYT?CC, Ruan, Shammon, Gordon and Leonard.
>
>
> Best,
>
> Hongshun
>
> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <lo...@gmail.com>
> wrote:
>
> > Hi everyone,
> > Thanks for your participation.
> >
> > @Gordon, I looked at the several questions you raised:
> >
> >    1. Should we use the firstDiscovery flag or two separate
> >    OffsetsInitializers? Actually, I have considered later. If we follow
> >    my initial idea, we can provide a default earliest OffsetsInitializer
> >    for a new partition. However, According to @Shammon's suggestion,
> different
> >    startup OffsetsInitializers correspond to different post-startup
> >    OffsetsInitializers for Flink's built-in offset strategies.
> >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> >    again, and it seems that neither @Shammon nor I have figured out .
> >    TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
> >    get the current end offsets of the partitions. This is going to be
> used in
> >    case we cannot find a suitable offset based on the timestamp, i.e.,
> the
> >    message meeting the requirement of the timestamp has not been
> produced to
> >    Kafka yet. *In this case, we just use the latest offset*." Therefore,
> >    using the TimestampOffsetsInitializer will always have an offset at
> >    startup.
> >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> >    "auto.offset.reset" position for partitions that are not hit.
> >
> > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > whether the offset specified at the beginning includes non-exist
> > partitions. The previous design may have SPECIFIC-OFFSET startup with
> > future partition. However, I think since different strategies have been
> > used for the first discovered partition and the later discovered
> partition,
> > the specified offset at startup should be the partitions that have been
> > confirmed to exist, if not an error will be thrown. If partitions still
> not
> > exist, it should be specified in the post-startup OffsetsInitializers
> > (default EARLIEST).
> >
> > Best
> > Hongshun
> >
> >
> > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zj...@gmail.com> wrote:
> >
> >> Thanks Gordon and Leonard
> >>
> >> I'm sorry that there is no specific case from my side, but I consider
> the
> >> issue as follows
> >>
> >> 1. Users may set an offset later than the current time because Flink
> does
> >> not limit it
> >> 2. If we use EARLIEST for a newly discovered partition with different
> >> OFFSETs, which may be different from the previous strategy. I think it's
> >> best to keep the same strategy as before if it does not cause data
> losing
> >> 3. I think support different OFFSETs in the FLIP will not make the
> >> implementation more complexity
> >>
> >> Of course, if it is confirmed that this is an illegal Timestamp OFFSET
> and
> >> Flink validate it. Then we can use the same strategy to apply to the
> newly
> >> discovered partition, I think this will be nice too
> >>
> >> Best,
> >> Shammon FY
> >>
> >>
> >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xb...@gmail.com> wrote:
> >>
> >> > Thanks Hongshun and Shammon for driving the FLIP!
> >> >
> >> >
> >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> >> > SPECIFIC-OFFSET
> >> > > post-startup*
> >> >
> >> > Grodan raised a good point about the future TIMESTAMP and
> >> SPECIFIC-OFFSET,
> >> > the timestamps/offset of the newly added partition is undetermined
> when
> >> the
> >> > job starts (the partition has not been created yet), and it is the
> >> > timestamps/offset in the future.
> >> >
> >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> past
> >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
> >> used
> >> > to specify existing timestamps/offset, which are used for business
> >> > scenarios such as backfilling data and re-refreshing data. At present,
> >> It's
> >> > hard to imagine a user scenario specifying a future timestamp to
> filter
> >> > data in the current topic of message queue system. Is it overthinking
> to
> >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >> >
> >> >
> >> > Best,
> >> > Leonard
> >>
> >
>