You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Steven Wu <st...@gmail.com> on 2022/05/02 02:26:21 UTC

Re: Source alignment for Iceberg

add dev@ group to the thread as Thomas suggested

Arvid,

The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
(idleness) can happen to Iceberg source alignment, as readers can be
temporarily starved due to the holdback by the enumerator when assigning
new splits upon request.

Totally agree that we should decouple this discussion with the FLIP-217,
which addresses the split level watermark alignment problem as a follow-up
of FLIP-182

Becket,

Yes, currently Iceberg source implemented the alignment leveraging the
dynamic split assignment from FLIP-27 design. Basically, the enumerator can
hold back split assignments to readers when necessary. Everything are
centralized in the enumerator: (1) watermark extraction and aggregation (2)
alignment decision and execution

The motivation of this discussion is to see if Iceberg source can leverage
some of the watermark alignment solutions (like FLIP-182) from Flink
framework. E.g., as mentioned in the doc, Iceberg source can potentially
leverage the FLIP-182 framework to do the watermark extraction and
aggregation. For the alignment decision and execution, we can keep them in
the centralized enumerator.

Thanks,
Steven

On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com> wrote:

> Hi Steven,
>
> Thanks for pulling me into this thread. I think the timestamp
> alignment use case here is a good example of what FLIP-27 was designed for.
>
> Technically speaking, Iceberg source can already implement the timestamp
> alignment in the Flink new source even without FLIP-182. However, I
> understand the rationale here because timestamp alignment is also trying to
> orchestrate the consumption of splits. However, it looks like FLIP-182 was
> not designed in a way that it can be easily extended for other use cases.
> It may probably worth thinking of a more general mechanism to answer the
> following questions:
>
> 1. What information whose source of truth is the Flink framework should be
> exposed to the SplitEnumerator and SourceReader? And how?
> 2. What control actions in the Flink framework are worth exposing to the
> SplitEnumerators and SourceReaders? And how?
>
> In the context of timestamp alignment, the first question is more
> relevant. For example, instead of hardcode the ReportWatermarkEvent
> handling logic in the SourceCoordinator, should we expose this to the
> SplitEnumerator? So basically there will be some information, such as
> subtask local watermark, whose source of truth is Flink runtime, but useful
> to the user provided pluggables.
>
> I think there are a few control flow patterns to make a complete design:
>
> a. Framework space information (e.g. watermark) --> User space Pluggables
> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a split).
> b. Framework space information (e.g. task failure) --> User space
> pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
> the job)
> c. User space information (e.g. a custom workload metric) --> User space
> pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
> the workload across the source readers).
> d. Use space information (e.g. a custom stopping event in the stream) -->
> User space pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g. stop the job).
>
> So basically for any user provided pluggables, the input information may
> either come from another user provided logic or from the framework, and
> after receiving that information, the pluggable may either want the
> framework or another pluggable to take an action. So this gives the above 4
> combinations.
>
> In our case, when the pluggables are SplitEnumerator and SourceReader, the
> control flows that only involve user space actions are fully supported. But
> it seems that when it comes to control flows involving framework space
> information, some of the information has not been exposed to the pluggable,
> and some framework actions might also be missing.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi folks,
>>
>> quick input from my side. I think this is from the implementation
>> perspective what Piotr and I had in mind for a global min watermark that
>> helps in idleness cases. See also point 3 in
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> .
>>
>> Basically, we would like to empower source enumerators to determine the
>> global min watermark for all source readers factoring in even future
>> splits. Not all sources can supply that information (think of a general
>> file source) but most should be able to. Basically, Flink should know for a
>> given source at a given point in time what the min watermark across all
>> source subtasks is.
>>
>> Here is some background:
>> In the context of idleness, we can deterministically advance the
>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in sources
>> to switch to idleness and thus allow watermarks to increase in cases where
>> fewer splits than source tasks are available. However, for sources with
>> dynamic split discovery that actually yields incorrect results. Think of a
>> Kinesis consumer where a shard is split. Then a previously idle source
>> subtask may receive a new split with time t0 as the lowest timestamp. Since
>> the source subtask did not participate in the global watermark generation
>> (because it was idle), the previously emitted watermark may be past t0 and
>> thus results in late records potentially being discarded. A rerun of the
>> same pipeline on historic data would not render the source subtask idle and
>> not result in late records. The solution was to not render source subtasks
>> automatically idle by the framework if there are no spits. That leads to
>> confusion for Kafka users with static topic subscription where #splits <
>> #parallelism stalls pipelines because the watermark is not advancing. Here,
>> your sketched solution can be transferred to KafkaSource to let Flink know
>> that min global watermark on a static assignment is determined by the
>> slowest partition. Hence, all idle readers emit that min global watermark
>> and the user sees progress.
>> This whole idea is related to FLIP-182 watermark alignment but I'd go
>> with another FLIP as the goal is quite different even though the
>> implementation overlaps.
>>
>> Now Iceberg seems to use the same information to actually pause the
>> consumption of files and create some kind of orderness guarantees as far as
>> I understood. This probably can be applied to any source with dynamic split
>> discovery. However, I wouldn't mix up the concepts and hence I appreciate
>> you not chiming into the FLIP-182 and ff. threads. The goal of FLIP-182 is
>> to pause readers while consuming a split, while your approach pauses
>> readers before processing another split. So it feels more closely related
>> to the global min watermark - so it could either be part of that FLIP or a
>> FLIP of its own. Afaik API changes should actually happen only on the
>> enumerator side both for your ideas and for global min watermark.
>>
>> Best,
>>
>> Arvid
>>
>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> Hi Steven,
>>>
>>> Would it be better to bring this as a separate thread related to Iceberg
>>> source to the dev@ list? I think this could benefit from broader input?
>>>
>>> Thanks
>>>
>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com> wrote:
>>>
>>>> + Becket and Sebastian
>>>>
>>>> It is also related to the split level watermark alignment discussion
>>>> thread. Because it is already very long, I don't want to further complicate
>>>> the ongoing discussion there. But I can move the discussion to that
>>>> existing thread if that is preferred.
>>>>
>>>>
>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We are thinking about how to align with the Flink community and
>>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I put some
>>>>> context in this google doc. Would love to get hear your thoughts on this.
>>>>>
>>>>>
>>>>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>

Re: Source alignment for Iceberg

Posted by Becket Qin <be...@gmail.com>.
Hey Steven,

Your conclusion at this point sounds reasonable to me. That being said, I
think we need to consider a bit more about the extensibility of Flink in
the future. I would be happy to drive some efforts in that direction. So
later on, the timestamp alignment of Iceberg may be able to leverage some
of the capabilities in the framework.

Thanks again for the detailed discussion!

Cheers,

Jiangjie (Becket) Qin

On Sat, May 7, 2022 at 11:15 AM Steven Wu <st...@gmail.com> wrote:

> might be the same as => might NOT be the same as
>
> On Fri, May 6, 2022 at 8:13 PM Steven Wu <st...@gmail.com> wrote:
>
> > The conclusion of this discussion could be that we don't see much value
> in
> > leveraging FLIP-182 with Iceberg source. That would totally be fine.
> >
> > For me, one big sticking point is the alignment timestamp for the
> > (Iceberg) source might be the same as the Flink application watermark.
> >
> > On Thu, May 5, 2022 at 9:53 PM Piotr Nowojski <pi...@gmail.com>
> > wrote:
> >
> >> Option 1 sounds reasonable but I would be tempted to wait for a second
> >> motivational use case before generalizing the framework. However I
> wouldn’t
> >> oppose this extension if others feel it’s useful and good thing to do
> >>
> >> Piotrek
> >>
> >> > Wiadomość napisana przez Becket Qin <be...@gmail.com> w dniu
> >> 06.05.2022, o godz. 03:50:
> >> >
> >> > I think the key point here is essentially what information should
> Flink
> >> > expose to the user pluggables. Apparently split / local task watermark
> >> is
> >> > something many user pluggables would be interested in. Right now it is
> >> > calculated by the Flink framework but not exposed to the users space,
> >> i.e.
> >> > SourceReader / SplitEnumerator. So it looks at least we can offer this
> >> > information in some way so users can leverage that information to do
> >> > things.
> >> >
> >> > That said, I am not sure if this would help in the Iceberg alignment
> >> case.
> >> > Because at this point, FLIP-182 reports source reader watermarks
> >> > periodically, which may not align with the RequestSplitEvent. So if we
> >> > really want to leverage the FLIP-182 mechanism here, I see a few ways,
> >> just
> >> > to name two of them:
> >> > 1. we can expose the source reader watermark in the
> >> SourceReaderContext, so
> >> > the source readers can put the local watermark in a custom operator
> >> event.
> >> > This will effectively bypass the existing RequestSplitEvent. Or we can
> >> also
> >> > extend the RequestSplitEvent to add an additional info field of byte[]
> >> > type, so users can piggy-back additional information there, be it
> >> watermark
> >> > or other stuff.
> >> > 2. Simply piggy-back the local watermark in the RequestSplitEvent and
> >> pass
> >> > that info to the SplitEnumerator as well.
> >> >
> >> > If we are going to do this, personally I'd prefer the first way, as it
> >> > provides a mechanism to allow future extension. So it would be easier
> to
> >> > expose other framework information to the user space in the future.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> >
> >> >
> >> >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <th...@apache.org> wrote:
> >> >>
> >> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com>
> >> wrote:
> >> >>> Any opinion on different timestamp for source alignment (vs Flink
> >> >> application watermark)? For Iceberg source, we might want to enforce
> >> >> alignment on kafka timestamp but Flink application watermark may use
> >> event
> >> >> time field from payload.
> >> >>
> >> >> I imagine that more generally the question is alignment based on the
> >> >> iceberg partition/file metadata vs. individual rows? I think that
> >> >> should work as long as there is a guarantee for out of orderness
> >> >> within the split?
> >> >>
> >> >> Thomas
> >> >>
> >> >>>
> >> >>> Thanks,
> >> >>> Steven
> >> >>>
> >> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com>
> >> wrote:
> >> >>>>
> >> >>>> Hey Piotr,
> >> >>>>
> >> >>>> I think the mechanism FLIP-182 provided is a reasonable default
> one,
> >> >> which
> >> >>>> ensures the watermarks are only drifted by an upper bound. However,
> >> >>>> admittedly there are also other strategies for different purposes.
> >> >>>>
> >> >>>> In the Iceberg case, I am not sure if a static strictly allowed
> >> >> watermark
> >> >>>> drift is desired. The source might just want to finish reading the
> >> >> assigned
> >> >>>> splits as fast as possible. And it is OK to have a drift of "one
> >> split",
> >> >>>> instead of a fixed time period.
> >> >>>>
> >> >>>> As another example, if there are some fast readers whose splits are
> >> >> always
> >> >>>> throttled, while the other slow readers are struggling to keep up
> >> with
> >> >> the
> >> >>>> rest of the splits, the split enumerator may decide to reassign the
> >> slow
> >> >>>> splits so all the readers have something to read. This would need
> the
> >> >>>> SplitEnumerator to be aware of the watermark progress on each
> reader.
> >> >> So it
> >> >>>> seems useful to expose the WatermarkAlignmentEvent information to
> the
> >> >>>> SplitEnumerator as well.
> >> >>>>
> >> >>>> Thanks,
> >> >>>>
> >> >>>> Jiangjie (Becket) Qin
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <
> pnowojski@apache.org>
> >> >> wrote:
> >> >>>>
> >> >>>>> Hi Steven,
> >> >>>>>
> >> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg
> just
> >> >> emit
> >> >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark
> alignment
> >> >> and
> >> >>>>> block the splits that are too much into the future? I can see this
> >> >> being an
> >> >>>>> issue if the existence of too many blocked splits is occupying too
> >> >> many
> >> >>>>> resources.
> >> >>>>>
> >> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
> >> >> have to
> >> >>>>> decide on some basis how many and which splits to assign in what
> >> >> order. But
> >> >>>>> in that case I'm not sure how much you could use from FLIP-182 and
> >> >>>>> FLIP-217. They seem somehow orthogonal to me, operating on
> different
> >> >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits
> have
> >> >> already
> >> >>>>> been generated and assigned. You could leverage FLIP-182 and
> >> FLIP-217
> >> >> and
> >> >>>>> take care of only the problem to limit the number of parallel
> active
> >> >>>>> splits. And here I'm not sure if it would be worth generalising a
> >> >> solution
> >> >>>>> across different connectors.
> >> >>>>>
> >> >>>>> Regarding the global watermark, I made a related comment sometime
> >> ago
> >> >>>>> about it [1]. It sounds to me like you also need to solve this
> >> >> problem,
> >> >>>>> otherwise Iceberg users will encounter late records in case of
> some
> >> >> race
> >> >>>>> conditions between assigning new splits and completions of older.
> >> >>>>>
> >> >>>>> Best,
> >> >>>>> Piotrek
> >> >>>>>
> >> >>>>> [1]
> >> >>>>>
> >> >>
> >>
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >> >>>>>
> >> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com>
> >> napisał(a):
> >> >>>>>
> >> >>>>>> add dev@ group to the thread as Thomas suggested
> >> >>>>>>
> >> >>>>>> Arvid,
> >> >>>>>>
> >> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
> >> >> FLIP-180
> >> >>>>>> (idleness) can happen to Iceberg source alignment, as readers can
> >> be
> >> >>>>>> temporarily starved due to the holdback by the enumerator when
> >> >> assigning
> >> >>>>>> new splits upon request.
> >> >>>>>>
> >> >>>>>> Totally agree that we should decouple this discussion with the
> >> >> FLIP-217,
> >> >>>>>> which addresses the split level watermark alignment problem as a
> >> >> follow-up
> >> >>>>>> of FLIP-182
> >> >>>>>>
> >> >>>>>> Becket,
> >> >>>>>>
> >> >>>>>> Yes, currently Iceberg source implemented the alignment
> leveraging
> >> >> the
> >> >>>>>> dynamic split assignment from FLIP-27 design. Basically, the
> >> >> enumerator
> >> >>>>>> can
> >> >>>>>> hold back split assignments to readers when necessary. Everything
> >> are
> >> >>>>>> centralized in the enumerator: (1) watermark extraction and
> >> >> aggregation
> >> >>>>>> (2)
> >> >>>>>> alignment decision and execution
> >> >>>>>>
> >> >>>>>> The motivation of this discussion is to see if Iceberg source can
> >> >> leverage
> >> >>>>>> some of the watermark alignment solutions (like FLIP-182) from
> >> Flink
> >> >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
> >> >> potentially
> >> >>>>>> leverage the FLIP-182 framework to do the watermark extraction
> and
> >> >>>>>> aggregation. For the alignment decision and execution, we can
> keep
> >> >> them in
> >> >>>>>> the centralized enumerator.
> >> >>>>>>
> >> >>>>>> Thanks,
> >> >>>>>> Steven
> >> >>>>>>
> >> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <becket.qin@gmail.com
> >
> >> >> wrote:
> >> >>>>>>
> >> >>>>>>> Hi Steven,
> >> >>>>>>>
> >> >>>>>>> Thanks for pulling me into this thread. I think the timestamp
> >> >>>>>>> alignment use case here is a good example of what FLIP-27 was
> >> >> designed
> >> >>>>>> for.
> >> >>>>>>>
> >> >>>>>>> Technically speaking, Iceberg source can already implement the
> >> >> timestamp
> >> >>>>>>> alignment in the Flink new source even without FLIP-182.
> However,
> >> I
> >> >>>>>>> understand the rationale here because timestamp alignment is
> also
> >> >>>>>> trying to
> >> >>>>>>> orchestrate the consumption of splits. However, it looks like
> >> >> FLIP-182
> >> >>>>>> was
> >> >>>>>>> not designed in a way that it can be easily extended for other
> use
> >> >>>>>> cases.
> >> >>>>>>> It may probably worth thinking of a more general mechanism to
> >> >> answer the
> >> >>>>>>> following questions:
> >> >>>>>>>
> >> >>>>>>> 1. What information whose source of truth is the Flink framework
> >> >> should
> >> >>>>>> be
> >> >>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
> >> >>>>>>> 2. What control actions in the Flink framework are worth
> exposing
> >> >> to the
> >> >>>>>>> SplitEnumerators and SourceReaders? And how?
> >> >>>>>>>
> >> >>>>>>> In the context of timestamp alignment, the first question is
> more
> >> >>>>>>> relevant. For example, instead of hardcode the
> >> ReportWatermarkEvent
> >> >>>>>>> handling logic in the SourceCoordinator, should we expose this
> to
> >> >> the
> >> >>>>>>> SplitEnumerator? So basically there will be some information,
> such
> >> >> as
> >> >>>>>>> subtask local watermark, whose source of truth is Flink runtime,
> >> >> but
> >> >>>>>> useful
> >> >>>>>>> to the user provided pluggables.
> >> >>>>>>>
> >> >>>>>>> I think there are a few control flow patterns to make a complete
> >> >> design:
> >> >>>>>>>
> >> >>>>>>> a. Framework space information (e.g. watermark) --> User space
> >> >>>>>> Pluggables
> >> >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause
> reading
> >> a
> >> >>>>>> split).
> >> >>>>>>> b. Framework space information (e.g. task failure) --> User
> space
> >> >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
> >> >> (e.g. exit
> >> >>>>>>> the job)
> >> >>>>>>> c. User space information (e.g. a custom workload metric) -->
> User
> >> >> space
> >> >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> >> >> rebalance
> >> >>>>>>> the workload across the source readers).
> >> >>>>>>> d. Use space information (e.g. a custom stopping event in the
> >> >> stream)
> >> >>>>>> -->
> >> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
> >> >> actions
> >> >>>>>>> (e.g. stop the job).
> >> >>>>>>>
> >> >>>>>>> So basically for any user provided pluggables, the input
> >> >> information may
> >> >>>>>>> either come from another user provided logic or from the
> >> >> framework, and
> >> >>>>>>> after receiving that information, the pluggable may either want
> >> the
> >> >>>>>>> framework or another pluggable to take an action. So this gives
> >> the
> >> >>>>>> above 4
> >> >>>>>>> combinations.
> >> >>>>>>>
> >> >>>>>>> In our case, when the pluggables are SplitEnumerator and
> >> >> SourceReader,
> >> >>>>>> the
> >> >>>>>>> control flows that only involve user space actions are fully
> >> >> supported.
> >> >>>>>> But
> >> >>>>>>> it seems that when it comes to control flows involving framework
> >> >> space
> >> >>>>>>> information, some of the information has not been exposed to the
> >> >>>>>> pluggable,
> >> >>>>>>> and some framework actions might also be missing.
> >> >>>>>>>
> >> >>>>>>> Thanks,
> >> >>>>>>>
> >> >>>>>>> Jiangjie (Becket) Qin
> >> >>>>>>>
> >> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
> >> >> wrote:
> >> >>>>>>>
> >> >>>>>>>> Hi folks,
> >> >>>>>>>>
> >> >>>>>>>> quick input from my side. I think this is from the
> implementation
> >> >>>>>>>> perspective what Piotr and I had in mind for a global min
> >> >> watermark
> >> >>>>>> that
> >> >>>>>>>> helps in idleness cases. See also point 3 in
> >> >>>>>>>>
> >> >>>>>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >> >>>>>>>> .
> >> >>>>>>>>
> >> >>>>>>>> Basically, we would like to empower source enumerators to
> >> >> determine the
> >> >>>>>>>> global min watermark for all source readers factoring in even
> >> >> future
> >> >>>>>>>> splits. Not all sources can supply that information (think of a
> >> >> general
> >> >>>>>>>> file source) but most should be able to. Basically, Flink
> should
> >> >> know
> >> >>>>>> for a
> >> >>>>>>>> given source at a given point in time what the min watermark
> >> >> across all
> >> >>>>>>>> source subtasks is.
> >> >>>>>>>>
> >> >>>>>>>> Here is some background:
> >> >>>>>>>> In the context of idleness, we can deterministically advance
> the
> >> >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches
> in
> >> >>>>>> sources
> >> >>>>>>>> to switch to idleness and thus allow watermarks to increase in
> >> >> cases
> >> >>>>>> where
> >> >>>>>>>> fewer splits than source tasks are available. However, for
> >> >> sources with
> >> >>>>>>>> dynamic split discovery that actually yields incorrect results.
> >> >> Think
> >> >>>>>> of a
> >> >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
> >> >> source
> >> >>>>>>>> subtask may receive a new split with time t0 as the lowest
> >> >> timestamp.
> >> >>>>>> Since
> >> >>>>>>>> the source subtask did not participate in the global watermark
> >> >>>>>> generation
> >> >>>>>>>> (because it was idle), the previously emitted watermark may be
> >> >> past t0
> >> >>>>>> and
> >> >>>>>>>> thus results in late records potentially being discarded. A
> rerun
> >> >> of
> >> >>>>>> the
> >> >>>>>>>> same pipeline on historic data would not render the source
> >> subtask
> >> >>>>>> idle and
> >> >>>>>>>> not result in late records. The solution was to not render
> source
> >> >>>>>> subtasks
> >> >>>>>>>> automatically idle by the framework if there are no spits. That
> >> >> leads
> >> >>>>>> to
> >> >>>>>>>> confusion for Kafka users with static topic subscription where
> >> >> #splits
> >> >>>>>> <
> >> >>>>>>>> #parallelism stalls pipelines because the watermark is not
> >> >> advancing.
> >> >>>>>> Here,
> >> >>>>>>>> your sketched solution can be transferred to KafkaSource to let
> >> >> Flink
> >> >>>>>> know
> >> >>>>>>>> that min global watermark on a static assignment is determined
> by
> >> >> the
> >> >>>>>>>> slowest partition. Hence, all idle readers emit that min global
> >> >>>>>> watermark
> >> >>>>>>>> and the user sees progress.
> >> >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
> >> >> I'd go
> >> >>>>>>>> with another FLIP as the goal is quite different even though
> the
> >> >>>>>>>> implementation overlaps.
> >> >>>>>>>>
> >> >>>>>>>> Now Iceberg seems to use the same information to actually pause
> >> >> the
> >> >>>>>>>> consumption of files and create some kind of orderness
> guarantees
> >> >> as
> >> >>>>>> far as
> >> >>>>>>>> I understood. This probably can be applied to any source with
> >> >> dynamic
> >> >>>>>> split
> >> >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
> >> >>>>>> appreciate
> >> >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
> >> >>>>>> FLIP-182 is
> >> >>>>>>>> to pause readers while consuming a split, while your approach
> >> >> pauses
> >> >>>>>>>> readers before processing another split. So it feels more
> closely
> >> >>>>>> related
> >> >>>>>>>> to the global min watermark - so it could either be part of
> that
> >> >> FLIP
> >> >>>>>> or a
> >> >>>>>>>> FLIP of its own. Afaik API changes should actually happen only
> on
> >> >> the
> >> >>>>>>>> enumerator side both for your ideas and for global min
> watermark.
> >> >>>>>>>>
> >> >>>>>>>> Best,
> >> >>>>>>>>
> >> >>>>>>>> Arvid
> >> >>>>>>>>
> >> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
> >> >> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> Hi Steven,
> >> >>>>>>>>>
> >> >>>>>>>>> Would it be better to bring this as a separate thread related
> to
> >> >>>>>> Iceberg
> >> >>>>>>>>> source to the dev@ list? I think this could benefit from
> >> broader
> >> >>>>>> input?
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks
> >> >>>>>>>>>
> >> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <
> stevenz3wu@gmail.com
> >> >
> >> >>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>> + Becket and Sebastian
> >> >>>>>>>>>>
> >> >>>>>>>>>> It is also related to the split level watermark alignment
> >> >> discussion
> >> >>>>>>>>>> thread. Because it is already very long, I don't want to
> >> further
> >> >>>>>> complicate
> >> >>>>>>>>>> the ongoing discussion there. But I can move the discussion
> to
> >> >> that
> >> >>>>>>>>>> existing thread if that is preferred.
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
> >> >> stevenz3wu@gmail.com>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>>> Hi all,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> We are thinking about how to align with the Flink community
> >> and
> >> >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
> >> >> source. I
> >> >>>>>> put some
> >> >>>>>>>>>>> context in this google doc. Would love to get hear your
> >> >> thoughts on
> >> >>>>>> this.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>
> >> >>
> >>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks,
> >> >>>>>>>>>>> Steven
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>
> >>
> >
>

Re: Source alignment for Iceberg

Posted by Steven Wu <st...@gmail.com>.
might be the same as => might NOT be the same as

On Fri, May 6, 2022 at 8:13 PM Steven Wu <st...@gmail.com> wrote:

> The conclusion of this discussion could be that we don't see much value in
> leveraging FLIP-182 with Iceberg source. That would totally be fine.
>
> For me, one big sticking point is the alignment timestamp for the
> (Iceberg) source might be the same as the Flink application watermark.
>
> On Thu, May 5, 2022 at 9:53 PM Piotr Nowojski <pi...@gmail.com>
> wrote:
>
>> Option 1 sounds reasonable but I would be tempted to wait for a second
>> motivational use case before generalizing the framework. However I wouldn’t
>> oppose this extension if others feel it’s useful and good thing to do
>>
>> Piotrek
>>
>> > Wiadomość napisana przez Becket Qin <be...@gmail.com> w dniu
>> 06.05.2022, o godz. 03:50:
>> >
>> > I think the key point here is essentially what information should Flink
>> > expose to the user pluggables. Apparently split / local task watermark
>> is
>> > something many user pluggables would be interested in. Right now it is
>> > calculated by the Flink framework but not exposed to the users space,
>> i.e.
>> > SourceReader / SplitEnumerator. So it looks at least we can offer this
>> > information in some way so users can leverage that information to do
>> > things.
>> >
>> > That said, I am not sure if this would help in the Iceberg alignment
>> case.
>> > Because at this point, FLIP-182 reports source reader watermarks
>> > periodically, which may not align with the RequestSplitEvent. So if we
>> > really want to leverage the FLIP-182 mechanism here, I see a few ways,
>> just
>> > to name two of them:
>> > 1. we can expose the source reader watermark in the
>> SourceReaderContext, so
>> > the source readers can put the local watermark in a custom operator
>> event.
>> > This will effectively bypass the existing RequestSplitEvent. Or we can
>> also
>> > extend the RequestSplitEvent to add an additional info field of byte[]
>> > type, so users can piggy-back additional information there, be it
>> watermark
>> > or other stuff.
>> > 2. Simply piggy-back the local watermark in the RequestSplitEvent and
>> pass
>> > that info to the SplitEnumerator as well.
>> >
>> > If we are going to do this, personally I'd prefer the first way, as it
>> > provides a mechanism to allow future extension. So it would be easier to
>> > expose other framework information to the user space in the future.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <th...@apache.org> wrote:
>> >>
>> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com>
>> wrote:
>> >>> Any opinion on different timestamp for source alignment (vs Flink
>> >> application watermark)? For Iceberg source, we might want to enforce
>> >> alignment on kafka timestamp but Flink application watermark may use
>> event
>> >> time field from payload.
>> >>
>> >> I imagine that more generally the question is alignment based on the
>> >> iceberg partition/file metadata vs. individual rows? I think that
>> >> should work as long as there is a guarantee for out of orderness
>> >> within the split?
>> >>
>> >> Thomas
>> >>
>> >>>
>> >>> Thanks,
>> >>> Steven
>> >>>
>> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com>
>> wrote:
>> >>>>
>> >>>> Hey Piotr,
>> >>>>
>> >>>> I think the mechanism FLIP-182 provided is a reasonable default one,
>> >> which
>> >>>> ensures the watermarks are only drifted by an upper bound. However,
>> >>>> admittedly there are also other strategies for different purposes.
>> >>>>
>> >>>> In the Iceberg case, I am not sure if a static strictly allowed
>> >> watermark
>> >>>> drift is desired. The source might just want to finish reading the
>> >> assigned
>> >>>> splits as fast as possible. And it is OK to have a drift of "one
>> split",
>> >>>> instead of a fixed time period.
>> >>>>
>> >>>> As another example, if there are some fast readers whose splits are
>> >> always
>> >>>> throttled, while the other slow readers are struggling to keep up
>> with
>> >> the
>> >>>> rest of the splits, the split enumerator may decide to reassign the
>> slow
>> >>>> splits so all the readers have something to read. This would need the
>> >>>> SplitEnumerator to be aware of the watermark progress on each reader.
>> >> So it
>> >>>> seems useful to expose the WatermarkAlignmentEvent information to the
>> >>>> SplitEnumerator as well.
>> >>>>
>> >>>> Thanks,
>> >>>>
>> >>>> Jiangjie (Becket) Qin
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
>> >> wrote:
>> >>>>
>> >>>>> Hi Steven,
>> >>>>>
>> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>> >> emit
>> >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment
>> >> and
>> >>>>> block the splits that are too much into the future? I can see this
>> >> being an
>> >>>>> issue if the existence of too many blocked splits is occupying too
>> >> many
>> >>>>> resources.
>> >>>>>
>> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
>> >> have to
>> >>>>> decide on some basis how many and which splits to assign in what
>> >> order. But
>> >>>>> in that case I'm not sure how much you could use from FLIP-182 and
>> >>>>> FLIP-217. They seem somehow orthogonal to me, operating on different
>> >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> >> already
>> >>>>> been generated and assigned. You could leverage FLIP-182 and
>> FLIP-217
>> >> and
>> >>>>> take care of only the problem to limit the number of parallel active
>> >>>>> splits. And here I'm not sure if it would be worth generalising a
>> >> solution
>> >>>>> across different connectors.
>> >>>>>
>> >>>>> Regarding the global watermark, I made a related comment sometime
>> ago
>> >>>>> about it [1]. It sounds to me like you also need to solve this
>> >> problem,
>> >>>>> otherwise Iceberg users will encounter late records in case of some
>> >> race
>> >>>>> conditions between assigning new splits and completions of older.
>> >>>>>
>> >>>>> Best,
>> >>>>> Piotrek
>> >>>>>
>> >>>>> [1]
>> >>>>>
>> >>
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >>>>>
>> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com>
>> napisał(a):
>> >>>>>
>> >>>>>> add dev@ group to the thread as Thomas suggested
>> >>>>>>
>> >>>>>> Arvid,
>> >>>>>>
>> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
>> >> FLIP-180
>> >>>>>> (idleness) can happen to Iceberg source alignment, as readers can
>> be
>> >>>>>> temporarily starved due to the holdback by the enumerator when
>> >> assigning
>> >>>>>> new splits upon request.
>> >>>>>>
>> >>>>>> Totally agree that we should decouple this discussion with the
>> >> FLIP-217,
>> >>>>>> which addresses the split level watermark alignment problem as a
>> >> follow-up
>> >>>>>> of FLIP-182
>> >>>>>>
>> >>>>>> Becket,
>> >>>>>>
>> >>>>>> Yes, currently Iceberg source implemented the alignment leveraging
>> >> the
>> >>>>>> dynamic split assignment from FLIP-27 design. Basically, the
>> >> enumerator
>> >>>>>> can
>> >>>>>> hold back split assignments to readers when necessary. Everything
>> are
>> >>>>>> centralized in the enumerator: (1) watermark extraction and
>> >> aggregation
>> >>>>>> (2)
>> >>>>>> alignment decision and execution
>> >>>>>>
>> >>>>>> The motivation of this discussion is to see if Iceberg source can
>> >> leverage
>> >>>>>> some of the watermark alignment solutions (like FLIP-182) from
>> Flink
>> >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
>> >> potentially
>> >>>>>> leverage the FLIP-182 framework to do the watermark extraction and
>> >>>>>> aggregation. For the alignment decision and execution, we can keep
>> >> them in
>> >>>>>> the centralized enumerator.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Steven
>> >>>>>>
>> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
>> >> wrote:
>> >>>>>>
>> >>>>>>> Hi Steven,
>> >>>>>>>
>> >>>>>>> Thanks for pulling me into this thread. I think the timestamp
>> >>>>>>> alignment use case here is a good example of what FLIP-27 was
>> >> designed
>> >>>>>> for.
>> >>>>>>>
>> >>>>>>> Technically speaking, Iceberg source can already implement the
>> >> timestamp
>> >>>>>>> alignment in the Flink new source even without FLIP-182. However,
>> I
>> >>>>>>> understand the rationale here because timestamp alignment is also
>> >>>>>> trying to
>> >>>>>>> orchestrate the consumption of splits. However, it looks like
>> >> FLIP-182
>> >>>>>> was
>> >>>>>>> not designed in a way that it can be easily extended for other use
>> >>>>>> cases.
>> >>>>>>> It may probably worth thinking of a more general mechanism to
>> >> answer the
>> >>>>>>> following questions:
>> >>>>>>>
>> >>>>>>> 1. What information whose source of truth is the Flink framework
>> >> should
>> >>>>>> be
>> >>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
>> >>>>>>> 2. What control actions in the Flink framework are worth exposing
>> >> to the
>> >>>>>>> SplitEnumerators and SourceReaders? And how?
>> >>>>>>>
>> >>>>>>> In the context of timestamp alignment, the first question is more
>> >>>>>>> relevant. For example, instead of hardcode the
>> ReportWatermarkEvent
>> >>>>>>> handling logic in the SourceCoordinator, should we expose this to
>> >> the
>> >>>>>>> SplitEnumerator? So basically there will be some information, such
>> >> as
>> >>>>>>> subtask local watermark, whose source of truth is Flink runtime,
>> >> but
>> >>>>>> useful
>> >>>>>>> to the user provided pluggables.
>> >>>>>>>
>> >>>>>>> I think there are a few control flow patterns to make a complete
>> >> design:
>> >>>>>>>
>> >>>>>>> a. Framework space information (e.g. watermark) --> User space
>> >>>>>> Pluggables
>> >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading
>> a
>> >>>>>> split).
>> >>>>>>> b. Framework space information (e.g. task failure) --> User space
>> >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
>> >> (e.g. exit
>> >>>>>>> the job)
>> >>>>>>> c. User space information (e.g. a custom workload metric) --> User
>> >> space
>> >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> >> rebalance
>> >>>>>>> the workload across the source readers).
>> >>>>>>> d. Use space information (e.g. a custom stopping event in the
>> >> stream)
>> >>>>>> -->
>> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
>> >> actions
>> >>>>>>> (e.g. stop the job).
>> >>>>>>>
>> >>>>>>> So basically for any user provided pluggables, the input
>> >> information may
>> >>>>>>> either come from another user provided logic or from the
>> >> framework, and
>> >>>>>>> after receiving that information, the pluggable may either want
>> the
>> >>>>>>> framework or another pluggable to take an action. So this gives
>> the
>> >>>>>> above 4
>> >>>>>>> combinations.
>> >>>>>>>
>> >>>>>>> In our case, when the pluggables are SplitEnumerator and
>> >> SourceReader,
>> >>>>>> the
>> >>>>>>> control flows that only involve user space actions are fully
>> >> supported.
>> >>>>>> But
>> >>>>>>> it seems that when it comes to control flows involving framework
>> >> space
>> >>>>>>> information, some of the information has not been exposed to the
>> >>>>>> pluggable,
>> >>>>>>> and some framework actions might also be missing.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Jiangjie (Becket) Qin
>> >>>>>>>
>> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> >> wrote:
>> >>>>>>>
>> >>>>>>>> Hi folks,
>> >>>>>>>>
>> >>>>>>>> quick input from my side. I think this is from the implementation
>> >>>>>>>> perspective what Piotr and I had in mind for a global min
>> >> watermark
>> >>>>>> that
>> >>>>>>>> helps in idleness cases. See also point 3 in
>> >>>>>>>>
>> >>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >>>>>>>> .
>> >>>>>>>>
>> >>>>>>>> Basically, we would like to empower source enumerators to
>> >> determine the
>> >>>>>>>> global min watermark for all source readers factoring in even
>> >> future
>> >>>>>>>> splits. Not all sources can supply that information (think of a
>> >> general
>> >>>>>>>> file source) but most should be able to. Basically, Flink should
>> >> know
>> >>>>>> for a
>> >>>>>>>> given source at a given point in time what the min watermark
>> >> across all
>> >>>>>>>> source subtasks is.
>> >>>>>>>>
>> >>>>>>>> Here is some background:
>> >>>>>>>> In the context of idleness, we can deterministically advance the
>> >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >>>>>> sources
>> >>>>>>>> to switch to idleness and thus allow watermarks to increase in
>> >> cases
>> >>>>>> where
>> >>>>>>>> fewer splits than source tasks are available. However, for
>> >> sources with
>> >>>>>>>> dynamic split discovery that actually yields incorrect results.
>> >> Think
>> >>>>>> of a
>> >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
>> >> source
>> >>>>>>>> subtask may receive a new split with time t0 as the lowest
>> >> timestamp.
>> >>>>>> Since
>> >>>>>>>> the source subtask did not participate in the global watermark
>> >>>>>> generation
>> >>>>>>>> (because it was idle), the previously emitted watermark may be
>> >> past t0
>> >>>>>> and
>> >>>>>>>> thus results in late records potentially being discarded. A rerun
>> >> of
>> >>>>>> the
>> >>>>>>>> same pipeline on historic data would not render the source
>> subtask
>> >>>>>> idle and
>> >>>>>>>> not result in late records. The solution was to not render source
>> >>>>>> subtasks
>> >>>>>>>> automatically idle by the framework if there are no spits. That
>> >> leads
>> >>>>>> to
>> >>>>>>>> confusion for Kafka users with static topic subscription where
>> >> #splits
>> >>>>>> <
>> >>>>>>>> #parallelism stalls pipelines because the watermark is not
>> >> advancing.
>> >>>>>> Here,
>> >>>>>>>> your sketched solution can be transferred to KafkaSource to let
>> >> Flink
>> >>>>>> know
>> >>>>>>>> that min global watermark on a static assignment is determined by
>> >> the
>> >>>>>>>> slowest partition. Hence, all idle readers emit that min global
>> >>>>>> watermark
>> >>>>>>>> and the user sees progress.
>> >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
>> >> I'd go
>> >>>>>>>> with another FLIP as the goal is quite different even though the
>> >>>>>>>> implementation overlaps.
>> >>>>>>>>
>> >>>>>>>> Now Iceberg seems to use the same information to actually pause
>> >> the
>> >>>>>>>> consumption of files and create some kind of orderness guarantees
>> >> as
>> >>>>>> far as
>> >>>>>>>> I understood. This probably can be applied to any source with
>> >> dynamic
>> >>>>>> split
>> >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
>> >>>>>> appreciate
>> >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >>>>>> FLIP-182 is
>> >>>>>>>> to pause readers while consuming a split, while your approach
>> >> pauses
>> >>>>>>>> readers before processing another split. So it feels more closely
>> >>>>>> related
>> >>>>>>>> to the global min watermark - so it could either be part of that
>> >> FLIP
>> >>>>>> or a
>> >>>>>>>> FLIP of its own. Afaik API changes should actually happen only on
>> >> the
>> >>>>>>>> enumerator side both for your ideas and for global min watermark.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>>
>> >>>>>>>> Arvid
>> >>>>>>>>
>> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
>> >> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Steven,
>> >>>>>>>>>
>> >>>>>>>>> Would it be better to bring this as a separate thread related to
>> >>>>>> Iceberg
>> >>>>>>>>> source to the dev@ list? I think this could benefit from
>> broader
>> >>>>>> input?
>> >>>>>>>>>
>> >>>>>>>>> Thanks
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz3wu@gmail.com
>> >
>> >>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> + Becket and Sebastian
>> >>>>>>>>>>
>> >>>>>>>>>> It is also related to the split level watermark alignment
>> >> discussion
>> >>>>>>>>>> thread. Because it is already very long, I don't want to
>> further
>> >>>>>> complicate
>> >>>>>>>>>> the ongoing discussion there. But I can move the discussion to
>> >> that
>> >>>>>>>>>> existing thread if that is preferred.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
>> >> stevenz3wu@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> We are thinking about how to align with the Flink community
>> and
>> >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>> >> source. I
>> >>>>>> put some
>> >>>>>>>>>>> context in this google doc. Would love to get hear your
>> >> thoughts on
>> >>>>>> this.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>
>> >>
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>> Steven
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>
>> >>>>>
>> >>
>>
>

Re: Source alignment for Iceberg

Posted by Steven Wu <st...@gmail.com>.
The conclusion of this discussion could be that we don't see much value in
leveraging FLIP-182 with Iceberg source. That would totally be fine.

For me, one big sticking point is the alignment timestamp for the (Iceberg)
source might be the same as the Flink application watermark.

On Thu, May 5, 2022 at 9:53 PM Piotr Nowojski <pi...@gmail.com>
wrote:

> Option 1 sounds reasonable but I would be tempted to wait for a second
> motivational use case before generalizing the framework. However I wouldn’t
> oppose this extension if others feel it’s useful and good thing to do
>
> Piotrek
>
> > Wiadomość napisana przez Becket Qin <be...@gmail.com> w dniu
> 06.05.2022, o godz. 03:50:
> >
> > I think the key point here is essentially what information should Flink
> > expose to the user pluggables. Apparently split / local task watermark is
> > something many user pluggables would be interested in. Right now it is
> > calculated by the Flink framework but not exposed to the users space,
> i.e.
> > SourceReader / SplitEnumerator. So it looks at least we can offer this
> > information in some way so users can leverage that information to do
> > things.
> >
> > That said, I am not sure if this would help in the Iceberg alignment
> case.
> > Because at this point, FLIP-182 reports source reader watermarks
> > periodically, which may not align with the RequestSplitEvent. So if we
> > really want to leverage the FLIP-182 mechanism here, I see a few ways,
> just
> > to name two of them:
> > 1. we can expose the source reader watermark in the SourceReaderContext,
> so
> > the source readers can put the local watermark in a custom operator
> event.
> > This will effectively bypass the existing RequestSplitEvent. Or we can
> also
> > extend the RequestSplitEvent to add an additional info field of byte[]
> > type, so users can piggy-back additional information there, be it
> watermark
> > or other stuff.
> > 2. Simply piggy-back the local watermark in the RequestSplitEvent and
> pass
> > that info to the SplitEnumerator as well.
> >
> > If we are going to do this, personally I'd prefer the first way, as it
> > provides a mechanism to allow future extension. So it would be easier to
> > expose other framework information to the user space in the future.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <th...@apache.org> wrote:
> >>
> >>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com>
> wrote:
> >>> Any opinion on different timestamp for source alignment (vs Flink
> >> application watermark)? For Iceberg source, we might want to enforce
> >> alignment on kafka timestamp but Flink application watermark may use
> event
> >> time field from payload.
> >>
> >> I imagine that more generally the question is alignment based on the
> >> iceberg partition/file metadata vs. individual rows? I think that
> >> should work as long as there is a guarantee for out of orderness
> >> within the split?
> >>
> >> Thomas
> >>
> >>>
> >>> Thanks,
> >>> Steven
> >>>
> >>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com>
> wrote:
> >>>>
> >>>> Hey Piotr,
> >>>>
> >>>> I think the mechanism FLIP-182 provided is a reasonable default one,
> >> which
> >>>> ensures the watermarks are only drifted by an upper bound. However,
> >>>> admittedly there are also other strategies for different purposes.
> >>>>
> >>>> In the Iceberg case, I am not sure if a static strictly allowed
> >> watermark
> >>>> drift is desired. The source might just want to finish reading the
> >> assigned
> >>>> splits as fast as possible. And it is OK to have a drift of "one
> split",
> >>>> instead of a fixed time period.
> >>>>
> >>>> As another example, if there are some fast readers whose splits are
> >> always
> >>>> throttled, while the other slow readers are struggling to keep up with
> >> the
> >>>> rest of the splits, the split enumerator may decide to reassign the
> slow
> >>>> splits so all the readers have something to read. This would need the
> >>>> SplitEnumerator to be aware of the watermark progress on each reader.
> >> So it
> >>>> seems useful to expose the WatermarkAlignmentEvent information to the
> >>>> SplitEnumerator as well.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jiangjie (Becket) Qin
> >>>>
> >>>>
> >>>>
> >>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
> >> wrote:
> >>>>
> >>>>> Hi Steven,
> >>>>>
> >>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
> >> emit
> >>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment
> >> and
> >>>>> block the splits that are too much into the future? I can see this
> >> being an
> >>>>> issue if the existence of too many blocked splits is occupying too
> >> many
> >>>>> resources.
> >>>>>
> >>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
> >> have to
> >>>>> decide on some basis how many and which splits to assign in what
> >> order. But
> >>>>> in that case I'm not sure how much you could use from FLIP-182 and
> >>>>> FLIP-217. They seem somehow orthogonal to me, operating on different
> >>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have
> >> already
> >>>>> been generated and assigned. You could leverage FLIP-182 and FLIP-217
> >> and
> >>>>> take care of only the problem to limit the number of parallel active
> >>>>> splits. And here I'm not sure if it would be worth generalising a
> >> solution
> >>>>> across different connectors.
> >>>>>
> >>>>> Regarding the global watermark, I made a related comment sometime ago
> >>>>> about it [1]. It sounds to me like you also need to solve this
> >> problem,
> >>>>> otherwise Iceberg users will encounter late records in case of some
> >> race
> >>>>> conditions between assigning new splits and completions of older.
> >>>>>
> >>>>> Best,
> >>>>> Piotrek
> >>>>>
> >>>>> [1]
> >>>>>
> >>
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >>>>>
> >>>>> pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com>
> napisał(a):
> >>>>>
> >>>>>> add dev@ group to the thread as Thomas suggested
> >>>>>>
> >>>>>> Arvid,
> >>>>>>
> >>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
> >> FLIP-180
> >>>>>> (idleness) can happen to Iceberg source alignment, as readers can be
> >>>>>> temporarily starved due to the holdback by the enumerator when
> >> assigning
> >>>>>> new splits upon request.
> >>>>>>
> >>>>>> Totally agree that we should decouple this discussion with the
> >> FLIP-217,
> >>>>>> which addresses the split level watermark alignment problem as a
> >> follow-up
> >>>>>> of FLIP-182
> >>>>>>
> >>>>>> Becket,
> >>>>>>
> >>>>>> Yes, currently Iceberg source implemented the alignment leveraging
> >> the
> >>>>>> dynamic split assignment from FLIP-27 design. Basically, the
> >> enumerator
> >>>>>> can
> >>>>>> hold back split assignments to readers when necessary. Everything
> are
> >>>>>> centralized in the enumerator: (1) watermark extraction and
> >> aggregation
> >>>>>> (2)
> >>>>>> alignment decision and execution
> >>>>>>
> >>>>>> The motivation of this discussion is to see if Iceberg source can
> >> leverage
> >>>>>> some of the watermark alignment solutions (like FLIP-182) from Flink
> >>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
> >> potentially
> >>>>>> leverage the FLIP-182 framework to do the watermark extraction and
> >>>>>> aggregation. For the alignment decision and execution, we can keep
> >> them in
> >>>>>> the centralized enumerator.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Steven
> >>>>>>
> >>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Steven,
> >>>>>>>
> >>>>>>> Thanks for pulling me into this thread. I think the timestamp
> >>>>>>> alignment use case here is a good example of what FLIP-27 was
> >> designed
> >>>>>> for.
> >>>>>>>
> >>>>>>> Technically speaking, Iceberg source can already implement the
> >> timestamp
> >>>>>>> alignment in the Flink new source even without FLIP-182. However, I
> >>>>>>> understand the rationale here because timestamp alignment is also
> >>>>>> trying to
> >>>>>>> orchestrate the consumption of splits. However, it looks like
> >> FLIP-182
> >>>>>> was
> >>>>>>> not designed in a way that it can be easily extended for other use
> >>>>>> cases.
> >>>>>>> It may probably worth thinking of a more general mechanism to
> >> answer the
> >>>>>>> following questions:
> >>>>>>>
> >>>>>>> 1. What information whose source of truth is the Flink framework
> >> should
> >>>>>> be
> >>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
> >>>>>>> 2. What control actions in the Flink framework are worth exposing
> >> to the
> >>>>>>> SplitEnumerators and SourceReaders? And how?
> >>>>>>>
> >>>>>>> In the context of timestamp alignment, the first question is more
> >>>>>>> relevant. For example, instead of hardcode the ReportWatermarkEvent
> >>>>>>> handling logic in the SourceCoordinator, should we expose this to
> >> the
> >>>>>>> SplitEnumerator? So basically there will be some information, such
> >> as
> >>>>>>> subtask local watermark, whose source of truth is Flink runtime,
> >> but
> >>>>>> useful
> >>>>>>> to the user provided pluggables.
> >>>>>>>
> >>>>>>> I think there are a few control flow patterns to make a complete
> >> design:
> >>>>>>>
> >>>>>>> a. Framework space information (e.g. watermark) --> User space
> >>>>>> Pluggables
> >>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
> >>>>>> split).
> >>>>>>> b. Framework space information (e.g. task failure) --> User space
> >>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
> >> (e.g. exit
> >>>>>>> the job)
> >>>>>>> c. User space information (e.g. a custom workload metric) --> User
> >> space
> >>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> >> rebalance
> >>>>>>> the workload across the source readers).
> >>>>>>> d. Use space information (e.g. a custom stopping event in the
> >> stream)
> >>>>>> -->
> >>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
> >> actions
> >>>>>>> (e.g. stop the job).
> >>>>>>>
> >>>>>>> So basically for any user provided pluggables, the input
> >> information may
> >>>>>>> either come from another user provided logic or from the
> >> framework, and
> >>>>>>> after receiving that information, the pluggable may either want the
> >>>>>>> framework or another pluggable to take an action. So this gives the
> >>>>>> above 4
> >>>>>>> combinations.
> >>>>>>>
> >>>>>>> In our case, when the pluggables are SplitEnumerator and
> >> SourceReader,
> >>>>>> the
> >>>>>>> control flows that only involve user space actions are fully
> >> supported.
> >>>>>> But
> >>>>>>> it seems that when it comes to control flows involving framework
> >> space
> >>>>>>> information, some of the information has not been exposed to the
> >>>>>> pluggable,
> >>>>>>> and some framework actions might also be missing.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jiangjie (Becket) Qin
> >>>>>>>
> >>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi folks,
> >>>>>>>>
> >>>>>>>> quick input from my side. I think this is from the implementation
> >>>>>>>> perspective what Piotr and I had in mind for a global min
> >> watermark
> >>>>>> that
> >>>>>>>> helps in idleness cases. See also point 3 in
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >>>>>>>> .
> >>>>>>>>
> >>>>>>>> Basically, we would like to empower source enumerators to
> >> determine the
> >>>>>>>> global min watermark for all source readers factoring in even
> >> future
> >>>>>>>> splits. Not all sources can supply that information (think of a
> >> general
> >>>>>>>> file source) but most should be able to. Basically, Flink should
> >> know
> >>>>>> for a
> >>>>>>>> given source at a given point in time what the min watermark
> >> across all
> >>>>>>>> source subtasks is.
> >>>>>>>>
> >>>>>>>> Here is some background:
> >>>>>>>> In the context of idleness, we can deterministically advance the
> >>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> >>>>>> sources
> >>>>>>>> to switch to idleness and thus allow watermarks to increase in
> >> cases
> >>>>>> where
> >>>>>>>> fewer splits than source tasks are available. However, for
> >> sources with
> >>>>>>>> dynamic split discovery that actually yields incorrect results.
> >> Think
> >>>>>> of a
> >>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
> >> source
> >>>>>>>> subtask may receive a new split with time t0 as the lowest
> >> timestamp.
> >>>>>> Since
> >>>>>>>> the source subtask did not participate in the global watermark
> >>>>>> generation
> >>>>>>>> (because it was idle), the previously emitted watermark may be
> >> past t0
> >>>>>> and
> >>>>>>>> thus results in late records potentially being discarded. A rerun
> >> of
> >>>>>> the
> >>>>>>>> same pipeline on historic data would not render the source subtask
> >>>>>> idle and
> >>>>>>>> not result in late records. The solution was to not render source
> >>>>>> subtasks
> >>>>>>>> automatically idle by the framework if there are no spits. That
> >> leads
> >>>>>> to
> >>>>>>>> confusion for Kafka users with static topic subscription where
> >> #splits
> >>>>>> <
> >>>>>>>> #parallelism stalls pipelines because the watermark is not
> >> advancing.
> >>>>>> Here,
> >>>>>>>> your sketched solution can be transferred to KafkaSource to let
> >> Flink
> >>>>>> know
> >>>>>>>> that min global watermark on a static assignment is determined by
> >> the
> >>>>>>>> slowest partition. Hence, all idle readers emit that min global
> >>>>>> watermark
> >>>>>>>> and the user sees progress.
> >>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
> >> I'd go
> >>>>>>>> with another FLIP as the goal is quite different even though the
> >>>>>>>> implementation overlaps.
> >>>>>>>>
> >>>>>>>> Now Iceberg seems to use the same information to actually pause
> >> the
> >>>>>>>> consumption of files and create some kind of orderness guarantees
> >> as
> >>>>>> far as
> >>>>>>>> I understood. This probably can be applied to any source with
> >> dynamic
> >>>>>> split
> >>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
> >>>>>> appreciate
> >>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
> >>>>>> FLIP-182 is
> >>>>>>>> to pause readers while consuming a split, while your approach
> >> pauses
> >>>>>>>> readers before processing another split. So it feels more closely
> >>>>>> related
> >>>>>>>> to the global min watermark - so it could either be part of that
> >> FLIP
> >>>>>> or a
> >>>>>>>> FLIP of its own. Afaik API changes should actually happen only on
> >> the
> >>>>>>>> enumerator side both for your ideas and for global min watermark.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Arvid
> >>>>>>>>
> >>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
> >> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Steven,
> >>>>>>>>>
> >>>>>>>>> Would it be better to bring this as a separate thread related to
> >>>>>> Iceberg
> >>>>>>>>> source to the dev@ list? I think this could benefit from broader
> >>>>>> input?
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> + Becket and Sebastian
> >>>>>>>>>>
> >>>>>>>>>> It is also related to the split level watermark alignment
> >> discussion
> >>>>>>>>>> thread. Because it is already very long, I don't want to further
> >>>>>> complicate
> >>>>>>>>>> the ongoing discussion there. But I can move the discussion to
> >> that
> >>>>>>>>>> existing thread if that is preferred.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
> >> stevenz3wu@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> We are thinking about how to align with the Flink community and
> >>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
> >> source. I
> >>>>>> put some
> >>>>>>>>>>> context in this google doc. Would love to get hear your
> >> thoughts on
> >>>>>> this.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>
> >>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Steven
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> >>>>>
> >>
>

Re: Source alignment for Iceberg

Posted by Piotr Nowojski <pi...@gmail.com>.
Option 1 sounds reasonable but I would be tempted to wait for a second motivational use case before generalizing the framework. However I wouldn’t oppose this extension if others feel it’s useful and good thing to do

Piotrek

> Wiadomość napisana przez Becket Qin <be...@gmail.com> w dniu 06.05.2022, o godz. 03:50:
> 
> I think the key point here is essentially what information should Flink
> expose to the user pluggables. Apparently split / local task watermark is
> something many user pluggables would be interested in. Right now it is
> calculated by the Flink framework but not exposed to the users space, i.e.
> SourceReader / SplitEnumerator. So it looks at least we can offer this
> information in some way so users can leverage that information to do
> things.
> 
> That said, I am not sure if this would help in the Iceberg alignment case.
> Because at this point, FLIP-182 reports source reader watermarks
> periodically, which may not align with the RequestSplitEvent. So if we
> really want to leverage the FLIP-182 mechanism here, I see a few ways, just
> to name two of them:
> 1. we can expose the source reader watermark in the SourceReaderContext, so
> the source readers can put the local watermark in a custom operator event.
> This will effectively bypass the existing RequestSplitEvent. Or we can also
> extend the RequestSplitEvent to add an additional info field of byte[]
> type, so users can piggy-back additional information there, be it watermark
> or other stuff.
> 2. Simply piggy-back the local watermark in the RequestSplitEvent and pass
> that info to the SplitEnumerator as well.
> 
> If we are going to do this, personally I'd prefer the first way, as it
> provides a mechanism to allow future extension. So it would be easier to
> expose other framework information to the user space in the future.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
>> On Fri, May 6, 2022 at 6:15 AM Thomas Weise <th...@apache.org> wrote:
>> 
>>> On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com> wrote:
>>> Any opinion on different timestamp for source alignment (vs Flink
>> application watermark)? For Iceberg source, we might want to enforce
>> alignment on kafka timestamp but Flink application watermark may use event
>> time field from payload.
>> 
>> I imagine that more generally the question is alignment based on the
>> iceberg partition/file metadata vs. individual rows? I think that
>> should work as long as there is a guarantee for out of orderness
>> within the split?
>> 
>> Thomas
>> 
>>> 
>>> Thanks,
>>> Steven
>>> 
>>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
>>>> 
>>>> Hey Piotr,
>>>> 
>>>> I think the mechanism FLIP-182 provided is a reasonable default one,
>> which
>>>> ensures the watermarks are only drifted by an upper bound. However,
>>>> admittedly there are also other strategies for different purposes.
>>>> 
>>>> In the Iceberg case, I am not sure if a static strictly allowed
>> watermark
>>>> drift is desired. The source might just want to finish reading the
>> assigned
>>>> splits as fast as possible. And it is OK to have a drift of "one split",
>>>> instead of a fixed time period.
>>>> 
>>>> As another example, if there are some fast readers whose splits are
>> always
>>>> throttled, while the other slow readers are struggling to keep up with
>> the
>>>> rest of the splits, the split enumerator may decide to reassign the slow
>>>> splits so all the readers have something to read. This would need the
>>>> SplitEnumerator to be aware of the watermark progress on each reader.
>> So it
>>>> seems useful to expose the WatermarkAlignmentEvent information to the
>>>> SplitEnumerator as well.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jiangjie (Becket) Qin
>>>> 
>>>> 
>>>> 
>>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>>> 
>>>>> Hi Steven,
>>>>> 
>>>>> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>> emit
>>>>> all splits and let FLIP-182/FLIP-217 handle the watermark alignment
>> and
>>>>> block the splits that are too much into the future? I can see this
>> being an
>>>>> issue if the existence of too many blocked splits is occupying too
>> many
>>>>> resources.
>>>>> 
>>>>> If that's the case, indeed SourceCoordinator/SplitEnumerator would
>> have to
>>>>> decide on some basis how many and which splits to assign in what
>> order. But
>>>>> in that case I'm not sure how much you could use from FLIP-182 and
>>>>> FLIP-217. They seem somehow orthogonal to me, operating on different
>>>>> levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> already
>>>>> been generated and assigned. You could leverage FLIP-182 and FLIP-217
>> and
>>>>> take care of only the problem to limit the number of parallel active
>>>>> splits. And here I'm not sure if it would be worth generalising a
>> solution
>>>>> across different connectors.
>>>>> 
>>>>> Regarding the global watermark, I made a related comment sometime ago
>>>>> about it [1]. It sounds to me like you also need to solve this
>> problem,
>>>>> otherwise Iceberg users will encounter late records in case of some
>> race
>>>>> conditions between assigning new splits and completions of older.
>>>>> 
>>>>> Best,
>>>>> Piotrek
>>>>> 
>>>>> [1]
>>>>> 
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>>>>> 
>>>>> pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>>>>> 
>>>>>> add dev@ group to the thread as Thomas suggested
>>>>>> 
>>>>>> Arvid,
>>>>>> 
>>>>>> The scenario 3 (Dynamic assignment + temporary no split) in the
>> FLIP-180
>>>>>> (idleness) can happen to Iceberg source alignment, as readers can be
>>>>>> temporarily starved due to the holdback by the enumerator when
>> assigning
>>>>>> new splits upon request.
>>>>>> 
>>>>>> Totally agree that we should decouple this discussion with the
>> FLIP-217,
>>>>>> which addresses the split level watermark alignment problem as a
>> follow-up
>>>>>> of FLIP-182
>>>>>> 
>>>>>> Becket,
>>>>>> 
>>>>>> Yes, currently Iceberg source implemented the alignment leveraging
>> the
>>>>>> dynamic split assignment from FLIP-27 design. Basically, the
>> enumerator
>>>>>> can
>>>>>> hold back split assignments to readers when necessary. Everything are
>>>>>> centralized in the enumerator: (1) watermark extraction and
>> aggregation
>>>>>> (2)
>>>>>> alignment decision and execution
>>>>>> 
>>>>>> The motivation of this discussion is to see if Iceberg source can
>> leverage
>>>>>> some of the watermark alignment solutions (like FLIP-182) from Flink
>>>>>> framework. E.g., as mentioned in the doc, Iceberg source can
>> potentially
>>>>>> leverage the FLIP-182 framework to do the watermark extraction and
>>>>>> aggregation. For the alignment decision and execution, we can keep
>> them in
>>>>>> the centralized enumerator.
>>>>>> 
>>>>>> Thanks,
>>>>>> Steven
>>>>>> 
>>>>>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Hi Steven,
>>>>>>> 
>>>>>>> Thanks for pulling me into this thread. I think the timestamp
>>>>>>> alignment use case here is a good example of what FLIP-27 was
>> designed
>>>>>> for.
>>>>>>> 
>>>>>>> Technically speaking, Iceberg source can already implement the
>> timestamp
>>>>>>> alignment in the Flink new source even without FLIP-182. However, I
>>>>>>> understand the rationale here because timestamp alignment is also
>>>>>> trying to
>>>>>>> orchestrate the consumption of splits. However, it looks like
>> FLIP-182
>>>>>> was
>>>>>>> not designed in a way that it can be easily extended for other use
>>>>>> cases.
>>>>>>> It may probably worth thinking of a more general mechanism to
>> answer the
>>>>>>> following questions:
>>>>>>> 
>>>>>>> 1. What information whose source of truth is the Flink framework
>> should
>>>>>> be
>>>>>>> exposed to the SplitEnumerator and SourceReader? And how?
>>>>>>> 2. What control actions in the Flink framework are worth exposing
>> to the
>>>>>>> SplitEnumerators and SourceReaders? And how?
>>>>>>> 
>>>>>>> In the context of timestamp alignment, the first question is more
>>>>>>> relevant. For example, instead of hardcode the ReportWatermarkEvent
>>>>>>> handling logic in the SourceCoordinator, should we expose this to
>> the
>>>>>>> SplitEnumerator? So basically there will be some information, such
>> as
>>>>>>> subtask local watermark, whose source of truth is Flink runtime,
>> but
>>>>>> useful
>>>>>>> to the user provided pluggables.
>>>>>>> 
>>>>>>> I think there are a few control flow patterns to make a complete
>> design:
>>>>>>> 
>>>>>>> a. Framework space information (e.g. watermark) --> User space
>>>>>> Pluggables
>>>>>>> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>>>>>> split).
>>>>>>> b. Framework space information (e.g. task failure) --> User space
>>>>>>> pluggables (e.g. SplitEnumerator) --> Framework space actions
>> (e.g. exit
>>>>>>> the job)
>>>>>>> c. User space information (e.g. a custom workload metric) --> User
>> space
>>>>>>> pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> rebalance
>>>>>>> the workload across the source readers).
>>>>>>> d. Use space information (e.g. a custom stopping event in the
>> stream)
>>>>>> -->
>>>>>>> User space pluggables (e.g. SplitEnumerator) --> Framework space
>> actions
>>>>>>> (e.g. stop the job).
>>>>>>> 
>>>>>>> So basically for any user provided pluggables, the input
>> information may
>>>>>>> either come from another user provided logic or from the
>> framework, and
>>>>>>> after receiving that information, the pluggable may either want the
>>>>>>> framework or another pluggable to take an action. So this gives the
>>>>>> above 4
>>>>>>> combinations.
>>>>>>> 
>>>>>>> In our case, when the pluggables are SplitEnumerator and
>> SourceReader,
>>>>>> the
>>>>>>> control flows that only involve user space actions are fully
>> supported.
>>>>>> But
>>>>>>> it seems that when it comes to control flows involving framework
>> space
>>>>>>> information, some of the information has not been exposed to the
>>>>>> pluggable,
>>>>>>> and some framework actions might also be missing.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> wrote:
>>>>>>> 
>>>>>>>> Hi folks,
>>>>>>>> 
>>>>>>>> quick input from my side. I think this is from the implementation
>>>>>>>> perspective what Piotr and I had in mind for a global min
>> watermark
>>>>>> that
>>>>>>>> helps in idleness cases. See also point 3 in
>>>>>>>> 
>>>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>>>>>>>> .
>>>>>>>> 
>>>>>>>> Basically, we would like to empower source enumerators to
>> determine the
>>>>>>>> global min watermark for all source readers factoring in even
>> future
>>>>>>>> splits. Not all sources can supply that information (think of a
>> general
>>>>>>>> file source) but most should be able to. Basically, Flink should
>> know
>>>>>> for a
>>>>>>>> given source at a given point in time what the min watermark
>> across all
>>>>>>>> source subtasks is.
>>>>>>>> 
>>>>>>>> Here is some background:
>>>>>>>> In the context of idleness, we can deterministically advance the
>>>>>>>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>>>>>> sources
>>>>>>>> to switch to idleness and thus allow watermarks to increase in
>> cases
>>>>>> where
>>>>>>>> fewer splits than source tasks are available. However, for
>> sources with
>>>>>>>> dynamic split discovery that actually yields incorrect results.
>> Think
>>>>>> of a
>>>>>>>> Kinesis consumer where a shard is split. Then a previously idle
>> source
>>>>>>>> subtask may receive a new split with time t0 as the lowest
>> timestamp.
>>>>>> Since
>>>>>>>> the source subtask did not participate in the global watermark
>>>>>> generation
>>>>>>>> (because it was idle), the previously emitted watermark may be
>> past t0
>>>>>> and
>>>>>>>> thus results in late records potentially being discarded. A rerun
>> of
>>>>>> the
>>>>>>>> same pipeline on historic data would not render the source subtask
>>>>>> idle and
>>>>>>>> not result in late records. The solution was to not render source
>>>>>> subtasks
>>>>>>>> automatically idle by the framework if there are no spits. That
>> leads
>>>>>> to
>>>>>>>> confusion for Kafka users with static topic subscription where
>> #splits
>>>>>> <
>>>>>>>> #parallelism stalls pipelines because the watermark is not
>> advancing.
>>>>>> Here,
>>>>>>>> your sketched solution can be transferred to KafkaSource to let
>> Flink
>>>>>> know
>>>>>>>> that min global watermark on a static assignment is determined by
>> the
>>>>>>>> slowest partition. Hence, all idle readers emit that min global
>>>>>> watermark
>>>>>>>> and the user sees progress.
>>>>>>>> This whole idea is related to FLIP-182 watermark alignment but
>> I'd go
>>>>>>>> with another FLIP as the goal is quite different even though the
>>>>>>>> implementation overlaps.
>>>>>>>> 
>>>>>>>> Now Iceberg seems to use the same information to actually pause
>> the
>>>>>>>> consumption of files and create some kind of orderness guarantees
>> as
>>>>>> far as
>>>>>>>> I understood. This probably can be applied to any source with
>> dynamic
>>>>>> split
>>>>>>>> discovery. However, I wouldn't mix up the concepts and hence I
>>>>>> appreciate
>>>>>>>> you not chiming into the FLIP-182 and ff. threads. The goal of
>>>>>> FLIP-182 is
>>>>>>>> to pause readers while consuming a split, while your approach
>> pauses
>>>>>>>> readers before processing another split. So it feels more closely
>>>>>> related
>>>>>>>> to the global min watermark - so it could either be part of that
>> FLIP
>>>>>> or a
>>>>>>>> FLIP of its own. Afaik API changes should actually happen only on
>> the
>>>>>>>> enumerator side both for your ideas and for global min watermark.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Arvid
>>>>>>>> 
>>>>>>>> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Steven,
>>>>>>>>> 
>>>>>>>>> Would it be better to bring this as a separate thread related to
>>>>>> Iceberg
>>>>>>>>> source to the dev@ list? I think this could benefit from broader
>>>>>> input?
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> 
>>>>>>>>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> + Becket and Sebastian
>>>>>>>>>> 
>>>>>>>>>> It is also related to the split level watermark alignment
>> discussion
>>>>>>>>>> thread. Because it is already very long, I don't want to further
>>>>>> complicate
>>>>>>>>>> the ongoing discussion there. But I can move the discussion to
>> that
>>>>>>>>>> existing thread if that is preferred.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
>> stevenz3wu@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> We are thinking about how to align with the Flink community and
>>>>>>>>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>> source. I
>>>>>> put some
>>>>>>>>>>> context in this google doc. Would love to get hear your
>> thoughts on
>>>>>> this.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>> 
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Steven
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>> 
>>>>> 
>> 

Re: Source alignment for Iceberg

Posted by Becket Qin <be...@gmail.com>.
I think the key point here is essentially what information should Flink
expose to the user pluggables. Apparently split / local task watermark is
something many user pluggables would be interested in. Right now it is
calculated by the Flink framework but not exposed to the users space, i.e.
SourceReader / SplitEnumerator. So it looks at least we can offer this
information in some way so users can leverage that information to do
things.

That said, I am not sure if this would help in the Iceberg alignment case.
Because at this point, FLIP-182 reports source reader watermarks
periodically, which may not align with the RequestSplitEvent. So if we
really want to leverage the FLIP-182 mechanism here, I see a few ways, just
to name two of them:
1. we can expose the source reader watermark in the SourceReaderContext, so
the source readers can put the local watermark in a custom operator event.
This will effectively bypass the existing RequestSplitEvent. Or we can also
extend the RequestSplitEvent to add an additional info field of byte[]
type, so users can piggy-back additional information there, be it watermark
or other stuff.
2. Simply piggy-back the local watermark in the RequestSplitEvent and pass
that info to the SplitEnumerator as well.

If we are going to do this, personally I'd prefer the first way, as it
provides a mechanism to allow future extension. So it would be easier to
expose other framework information to the user space in the future.

Thanks,

Jiangjie (Becket) Qin



On Fri, May 6, 2022 at 6:15 AM Thomas Weise <th...@apache.org> wrote:

> On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com> wrote:
> > Any opinion on different timestamp for source alignment (vs Flink
> application watermark)? For Iceberg source, we might want to enforce
> alignment on kafka timestamp but Flink application watermark may use event
> time field from payload.
>
> I imagine that more generally the question is alignment based on the
> iceberg partition/file metadata vs. individual rows? I think that
> should work as long as there is a guarantee for out of orderness
> within the split?
>
> Thomas
>
> >
> > Thanks,
> > Steven
> >
> > On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
> >>
> >> Hey Piotr,
> >>
> >> I think the mechanism FLIP-182 provided is a reasonable default one,
> which
> >> ensures the watermarks are only drifted by an upper bound. However,
> >> admittedly there are also other strategies for different purposes.
> >>
> >> In the Iceberg case, I am not sure if a static strictly allowed
> watermark
> >> drift is desired. The source might just want to finish reading the
> assigned
> >> splits as fast as possible. And it is OK to have a drift of "one split",
> >> instead of a fixed time period.
> >>
> >> As another example, if there are some fast readers whose splits are
> always
> >> throttled, while the other slow readers are struggling to keep up with
> the
> >> rest of the splits, the split enumerator may decide to reassign the slow
> >> splits so all the readers have something to read. This would need the
> >> SplitEnumerator to be aware of the watermark progress on each reader.
> So it
> >> seems useful to expose the WatermarkAlignmentEvent information to the
> >> SplitEnumerator as well.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
> wrote:
> >>
> >> > Hi Steven,
> >> >
> >> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
> emit
> >> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment
> and
> >> > block the splits that are too much into the future? I can see this
> being an
> >> > issue if the existence of too many blocked splits is occupying too
> many
> >> > resources.
> >> >
> >> > If that's the case, indeed SourceCoordinator/SplitEnumerator would
> have to
> >> > decide on some basis how many and which splits to assign in what
> order. But
> >> > in that case I'm not sure how much you could use from FLIP-182 and
> >> > FLIP-217. They seem somehow orthogonal to me, operating on different
> >> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
> already
> >> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
> and
> >> > take care of only the problem to limit the number of parallel active
> >> > splits. And here I'm not sure if it would be worth generalising a
> solution
> >> > across different connectors.
> >> >
> >> > Regarding the global watermark, I made a related comment sometime ago
> >> > about it [1]. It sounds to me like you also need to solve this
> problem,
> >> > otherwise Iceberg users will encounter late records in case of some
> race
> >> > conditions between assigning new splits and completions of older.
> >> >
> >> > Best,
> >> > Piotrek
> >> >
> >> > [1]
> >> >
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >> >
> >> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
> >> >
> >> >> add dev@ group to the thread as Thomas suggested
> >> >>
> >> >> Arvid,
> >> >>
> >> >> The scenario 3 (Dynamic assignment + temporary no split) in the
> FLIP-180
> >> >> (idleness) can happen to Iceberg source alignment, as readers can be
> >> >> temporarily starved due to the holdback by the enumerator when
> assigning
> >> >> new splits upon request.
> >> >>
> >> >> Totally agree that we should decouple this discussion with the
> FLIP-217,
> >> >> which addresses the split level watermark alignment problem as a
> follow-up
> >> >> of FLIP-182
> >> >>
> >> >> Becket,
> >> >>
> >> >> Yes, currently Iceberg source implemented the alignment leveraging
> the
> >> >> dynamic split assignment from FLIP-27 design. Basically, the
> enumerator
> >> >> can
> >> >> hold back split assignments to readers when necessary. Everything are
> >> >> centralized in the enumerator: (1) watermark extraction and
> aggregation
> >> >> (2)
> >> >> alignment decision and execution
> >> >>
> >> >> The motivation of this discussion is to see if Iceberg source can
> leverage
> >> >> some of the watermark alignment solutions (like FLIP-182) from Flink
> >> >> framework. E.g., as mentioned in the doc, Iceberg source can
> potentially
> >> >> leverage the FLIP-182 framework to do the watermark extraction and
> >> >> aggregation. For the alignment decision and execution, we can keep
> them in
> >> >> the centralized enumerator.
> >> >>
> >> >> Thanks,
> >> >> Steven
> >> >>
> >> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
> wrote:
> >> >>
> >> >> > Hi Steven,
> >> >> >
> >> >> > Thanks for pulling me into this thread. I think the timestamp
> >> >> > alignment use case here is a good example of what FLIP-27 was
> designed
> >> >> for.
> >> >> >
> >> >> > Technically speaking, Iceberg source can already implement the
> timestamp
> >> >> > alignment in the Flink new source even without FLIP-182. However, I
> >> >> > understand the rationale here because timestamp alignment is also
> >> >> trying to
> >> >> > orchestrate the consumption of splits. However, it looks like
> FLIP-182
> >> >> was
> >> >> > not designed in a way that it can be easily extended for other use
> >> >> cases.
> >> >> > It may probably worth thinking of a more general mechanism to
> answer the
> >> >> > following questions:
> >> >> >
> >> >> > 1. What information whose source of truth is the Flink framework
> should
> >> >> be
> >> >> > exposed to the SplitEnumerator and SourceReader? And how?
> >> >> > 2. What control actions in the Flink framework are worth exposing
> to the
> >> >> > SplitEnumerators and SourceReaders? And how?
> >> >> >
> >> >> > In the context of timestamp alignment, the first question is more
> >> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
> >> >> > handling logic in the SourceCoordinator, should we expose this to
> the
> >> >> > SplitEnumerator? So basically there will be some information, such
> as
> >> >> > subtask local watermark, whose source of truth is Flink runtime,
> but
> >> >> useful
> >> >> > to the user provided pluggables.
> >> >> >
> >> >> > I think there are a few control flow patterns to make a complete
> design:
> >> >> >
> >> >> > a. Framework space information (e.g. watermark) --> User space
> >> >> Pluggables
> >> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
> >> >> split).
> >> >> > b. Framework space information (e.g. task failure) --> User space
> >> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g. exit
> >> >> > the job)
> >> >> > c. User space information (e.g. a custom workload metric) --> User
> space
> >> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> rebalance
> >> >> > the workload across the source readers).
> >> >> > d. Use space information (e.g. a custom stopping event in the
> stream)
> >> >> -->
> >> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
> actions
> >> >> > (e.g. stop the job).
> >> >> >
> >> >> > So basically for any user provided pluggables, the input
> information may
> >> >> > either come from another user provided logic or from the
> framework, and
> >> >> > after receiving that information, the pluggable may either want the
> >> >> > framework or another pluggable to take an action. So this gives the
> >> >> above 4
> >> >> > combinations.
> >> >> >
> >> >> > In our case, when the pluggables are SplitEnumerator and
> SourceReader,
> >> >> the
> >> >> > control flows that only involve user space actions are fully
> supported.
> >> >> But
> >> >> > it seems that when it comes to control flows involving framework
> space
> >> >> > information, some of the information has not been exposed to the
> >> >> pluggable,
> >> >> > and some framework actions might also be missing.
> >> >> >
> >> >> > Thanks,
> >> >> >
> >> >> > Jiangjie (Becket) Qin
> >> >> >
> >> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
> wrote:
> >> >> >
> >> >> >> Hi folks,
> >> >> >>
> >> >> >> quick input from my side. I think this is from the implementation
> >> >> >> perspective what Piotr and I had in mind for a global min
> watermark
> >> >> that
> >> >> >> helps in idleness cases. See also point 3 in
> >> >> >>
> >> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >> >> >> .
> >> >> >>
> >> >> >> Basically, we would like to empower source enumerators to
> determine the
> >> >> >> global min watermark for all source readers factoring in even
> future
> >> >> >> splits. Not all sources can supply that information (think of a
> general
> >> >> >> file source) but most should be able to. Basically, Flink should
> know
> >> >> for a
> >> >> >> given source at a given point in time what the min watermark
> across all
> >> >> >> source subtasks is.
> >> >> >>
> >> >> >> Here is some background:
> >> >> >> In the context of idleness, we can deterministically advance the
> >> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> >> >> sources
> >> >> >> to switch to idleness and thus allow watermarks to increase in
> cases
> >> >> where
> >> >> >> fewer splits than source tasks are available. However, for
> sources with
> >> >> >> dynamic split discovery that actually yields incorrect results.
> Think
> >> >> of a
> >> >> >> Kinesis consumer where a shard is split. Then a previously idle
> source
> >> >> >> subtask may receive a new split with time t0 as the lowest
> timestamp.
> >> >> Since
> >> >> >> the source subtask did not participate in the global watermark
> >> >> generation
> >> >> >> (because it was idle), the previously emitted watermark may be
> past t0
> >> >> and
> >> >> >> thus results in late records potentially being discarded. A rerun
> of
> >> >> the
> >> >> >> same pipeline on historic data would not render the source subtask
> >> >> idle and
> >> >> >> not result in late records. The solution was to not render source
> >> >> subtasks
> >> >> >> automatically idle by the framework if there are no spits. That
> leads
> >> >> to
> >> >> >> confusion for Kafka users with static topic subscription where
> #splits
> >> >> <
> >> >> >> #parallelism stalls pipelines because the watermark is not
> advancing.
> >> >> Here,
> >> >> >> your sketched solution can be transferred to KafkaSource to let
> Flink
> >> >> know
> >> >> >> that min global watermark on a static assignment is determined by
> the
> >> >> >> slowest partition. Hence, all idle readers emit that min global
> >> >> watermark
> >> >> >> and the user sees progress.
> >> >> >> This whole idea is related to FLIP-182 watermark alignment but
> I'd go
> >> >> >> with another FLIP as the goal is quite different even though the
> >> >> >> implementation overlaps.
> >> >> >>
> >> >> >> Now Iceberg seems to use the same information to actually pause
> the
> >> >> >> consumption of files and create some kind of orderness guarantees
> as
> >> >> far as
> >> >> >> I understood. This probably can be applied to any source with
> dynamic
> >> >> split
> >> >> >> discovery. However, I wouldn't mix up the concepts and hence I
> >> >> appreciate
> >> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
> >> >> FLIP-182 is
> >> >> >> to pause readers while consuming a split, while your approach
> pauses
> >> >> >> readers before processing another split. So it feels more closely
> >> >> related
> >> >> >> to the global min watermark - so it could either be part of that
> FLIP
> >> >> or a
> >> >> >> FLIP of its own. Afaik API changes should actually happen only on
> the
> >> >> >> enumerator side both for your ideas and for global min watermark.
> >> >> >>
> >> >> >> Best,
> >> >> >>
> >> >> >> Arvid
> >> >> >>
> >> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
> wrote:
> >> >> >>
> >> >> >>> Hi Steven,
> >> >> >>>
> >> >> >>> Would it be better to bring this as a separate thread related to
> >> >> Iceberg
> >> >> >>> source to the dev@ list? I think this could benefit from broader
> >> >> input?
> >> >> >>>
> >> >> >>> Thanks
> >> >> >>>
> >> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
> >> >> wrote:
> >> >> >>>
> >> >> >>>> + Becket and Sebastian
> >> >> >>>>
> >> >> >>>> It is also related to the split level watermark alignment
> discussion
> >> >> >>>> thread. Because it is already very long, I don't want to further
> >> >> complicate
> >> >> >>>> the ongoing discussion there. But I can move the discussion to
> that
> >> >> >>>> existing thread if that is preferred.
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
> stevenz3wu@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Hi all,
> >> >> >>>>>
> >> >> >>>>> We are thinking about how to align with the Flink community and
> >> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg
> source. I
> >> >> put some
> >> >> >>>>> context in this google doc. Would love to get hear your
> thoughts on
> >> >> this.
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >> >> >>>>>
> >> >> >>>>> Thanks,
> >> >> >>>>> Steven
> >> >> >>>>>
> >> >> >>>>
> >> >>
> >> >
>

Re: Source alignment for Iceberg

Posted by Thomas Weise <th...@apache.org>.
On Wed, May 4, 2022 at 11:03 AM Steven Wu <st...@gmail.com> wrote:
> Any opinion on different timestamp for source alignment (vs Flink application watermark)? For Iceberg source, we might want to enforce alignment on kafka timestamp but Flink application watermark may use event time field from payload.

I imagine that more generally the question is alignment based on the
iceberg partition/file metadata vs. individual rows? I think that
should work as long as there is a guarantee for out of orderness
within the split?

Thomas

>
> Thanks,
> Steven
>
> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
>>
>> Hey Piotr,
>>
>> I think the mechanism FLIP-182 provided is a reasonable default one, which
>> ensures the watermarks are only drifted by an upper bound. However,
>> admittedly there are also other strategies for different purposes.
>>
>> In the Iceberg case, I am not sure if a static strictly allowed watermark
>> drift is desired. The source might just want to finish reading the assigned
>> splits as fast as possible. And it is OK to have a drift of "one split",
>> instead of a fixed time period.
>>
>> As another example, if there are some fast readers whose splits are always
>> throttled, while the other slow readers are struggling to keep up with the
>> rest of the splits, the split enumerator may decide to reassign the slow
>> splits so all the readers have something to read. This would need the
>> SplitEnumerator to be aware of the watermark progress on each reader. So it
>> seems useful to expose the WatermarkAlignmentEvent information to the
>> SplitEnumerator as well.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org> wrote:
>>
>> > Hi Steven,
>> >
>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
>> > block the splits that are too much into the future? I can see this being an
>> > issue if the existence of too many blocked splits is occupying too many
>> > resources.
>> >
>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have to
>> > decide on some basis how many and which splits to assign in what order. But
>> > in that case I'm not sure how much you could use from FLIP-182 and
>> > FLIP-217. They seem somehow orthogonal to me, operating on different
>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have already
>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
>> > take care of only the problem to limit the number of parallel active
>> > splits. And here I'm not sure if it would be worth generalising a solution
>> > across different connectors.
>> >
>> > Regarding the global watermark, I made a related comment sometime ago
>> > about it [1]. It sounds to me like you also need to solve this problem,
>> > otherwise Iceberg users will encounter late records in case of some race
>> > conditions between assigning new splits and completions of older.
>> >
>> > Best,
>> > Piotrek
>> >
>> > [1]
>> > https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >
>> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>> >
>> >> add dev@ group to the thread as Thomas suggested
>> >>
>> >> Arvid,
>> >>
>> >> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
>> >> (idleness) can happen to Iceberg source alignment, as readers can be
>> >> temporarily starved due to the holdback by the enumerator when assigning
>> >> new splits upon request.
>> >>
>> >> Totally agree that we should decouple this discussion with the FLIP-217,
>> >> which addresses the split level watermark alignment problem as a follow-up
>> >> of FLIP-182
>> >>
>> >> Becket,
>> >>
>> >> Yes, currently Iceberg source implemented the alignment leveraging the
>> >> dynamic split assignment from FLIP-27 design. Basically, the enumerator
>> >> can
>> >> hold back split assignments to readers when necessary. Everything are
>> >> centralized in the enumerator: (1) watermark extraction and aggregation
>> >> (2)
>> >> alignment decision and execution
>> >>
>> >> The motivation of this discussion is to see if Iceberg source can leverage
>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
>> >> framework. E.g., as mentioned in the doc, Iceberg source can potentially
>> >> leverage the FLIP-182 framework to do the watermark extraction and
>> >> aggregation. For the alignment decision and execution, we can keep them in
>> >> the centralized enumerator.
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com> wrote:
>> >>
>> >> > Hi Steven,
>> >> >
>> >> > Thanks for pulling me into this thread. I think the timestamp
>> >> > alignment use case here is a good example of what FLIP-27 was designed
>> >> for.
>> >> >
>> >> > Technically speaking, Iceberg source can already implement the timestamp
>> >> > alignment in the Flink new source even without FLIP-182. However, I
>> >> > understand the rationale here because timestamp alignment is also
>> >> trying to
>> >> > orchestrate the consumption of splits. However, it looks like FLIP-182
>> >> was
>> >> > not designed in a way that it can be easily extended for other use
>> >> cases.
>> >> > It may probably worth thinking of a more general mechanism to answer the
>> >> > following questions:
>> >> >
>> >> > 1. What information whose source of truth is the Flink framework should
>> >> be
>> >> > exposed to the SplitEnumerator and SourceReader? And how?
>> >> > 2. What control actions in the Flink framework are worth exposing to the
>> >> > SplitEnumerators and SourceReaders? And how?
>> >> >
>> >> > In the context of timestamp alignment, the first question is more
>> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> >> > handling logic in the SourceCoordinator, should we expose this to the
>> >> > SplitEnumerator? So basically there will be some information, such as
>> >> > subtask local watermark, whose source of truth is Flink runtime, but
>> >> useful
>> >> > to the user provided pluggables.
>> >> >
>> >> > I think there are a few control flow patterns to make a complete design:
>> >> >
>> >> > a. Framework space information (e.g. watermark) --> User space
>> >> Pluggables
>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>> >> split).
>> >> > b. Framework space information (e.g. task failure) --> User space
>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
>> >> > the job)
>> >> > c. User space information (e.g. a custom workload metric) --> User space
>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
>> >> > the workload across the source readers).
>> >> > d. Use space information (e.g. a custom stopping event in the stream)
>> >> -->
>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space actions
>> >> > (e.g. stop the job).
>> >> >
>> >> > So basically for any user provided pluggables, the input information may
>> >> > either come from another user provided logic or from the framework, and
>> >> > after receiving that information, the pluggable may either want the
>> >> > framework or another pluggable to take an action. So this gives the
>> >> above 4
>> >> > combinations.
>> >> >
>> >> > In our case, when the pluggables are SplitEnumerator and SourceReader,
>> >> the
>> >> > control flows that only involve user space actions are fully supported.
>> >> But
>> >> > it seems that when it comes to control flows involving framework space
>> >> > information, some of the information has not been exposed to the
>> >> pluggable,
>> >> > and some framework actions might also be missing.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jiangjie (Becket) Qin
>> >> >
>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
>> >> >
>> >> >> Hi folks,
>> >> >>
>> >> >> quick input from my side. I think this is from the implementation
>> >> >> perspective what Piotr and I had in mind for a global min watermark
>> >> that
>> >> >> helps in idleness cases. See also point 3 in
>> >> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >> >> .
>> >> >>
>> >> >> Basically, we would like to empower source enumerators to determine the
>> >> >> global min watermark for all source readers factoring in even future
>> >> >> splits. Not all sources can supply that information (think of a general
>> >> >> file source) but most should be able to. Basically, Flink should know
>> >> for a
>> >> >> given source at a given point in time what the min watermark across all
>> >> >> source subtasks is.
>> >> >>
>> >> >> Here is some background:
>> >> >> In the context of idleness, we can deterministically advance the
>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >> sources
>> >> >> to switch to idleness and thus allow watermarks to increase in cases
>> >> where
>> >> >> fewer splits than source tasks are available. However, for sources with
>> >> >> dynamic split discovery that actually yields incorrect results. Think
>> >> of a
>> >> >> Kinesis consumer where a shard is split. Then a previously idle source
>> >> >> subtask may receive a new split with time t0 as the lowest timestamp.
>> >> Since
>> >> >> the source subtask did not participate in the global watermark
>> >> generation
>> >> >> (because it was idle), the previously emitted watermark may be past t0
>> >> and
>> >> >> thus results in late records potentially being discarded. A rerun of
>> >> the
>> >> >> same pipeline on historic data would not render the source subtask
>> >> idle and
>> >> >> not result in late records. The solution was to not render source
>> >> subtasks
>> >> >> automatically idle by the framework if there are no spits. That leads
>> >> to
>> >> >> confusion for Kafka users with static topic subscription where #splits
>> >> <
>> >> >> #parallelism stalls pipelines because the watermark is not advancing.
>> >> Here,
>> >> >> your sketched solution can be transferred to KafkaSource to let Flink
>> >> know
>> >> >> that min global watermark on a static assignment is determined by the
>> >> >> slowest partition. Hence, all idle readers emit that min global
>> >> watermark
>> >> >> and the user sees progress.
>> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd go
>> >> >> with another FLIP as the goal is quite different even though the
>> >> >> implementation overlaps.
>> >> >>
>> >> >> Now Iceberg seems to use the same information to actually pause the
>> >> >> consumption of files and create some kind of orderness guarantees as
>> >> far as
>> >> >> I understood. This probably can be applied to any source with dynamic
>> >> split
>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
>> >> appreciate
>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >> FLIP-182 is
>> >> >> to pause readers while consuming a split, while your approach pauses
>> >> >> readers before processing another split. So it feels more closely
>> >> related
>> >> >> to the global min watermark - so it could either be part of that FLIP
>> >> or a
>> >> >> FLIP of its own. Afaik API changes should actually happen only on the
>> >> >> enumerator side both for your ideas and for global min watermark.
>> >> >>
>> >> >> Best,
>> >> >>
>> >> >> Arvid
>> >> >>
>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org> wrote:
>> >> >>
>> >> >>> Hi Steven,
>> >> >>>
>> >> >>> Would it be better to bring this as a separate thread related to
>> >> Iceberg
>> >> >>> source to the dev@ list? I think this could benefit from broader
>> >> input?
>> >> >>>
>> >> >>> Thanks
>> >> >>>
>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> + Becket and Sebastian
>> >> >>>>
>> >> >>>> It is also related to the split level watermark alignment discussion
>> >> >>>> thread. Because it is already very long, I don't want to further
>> >> complicate
>> >> >>>> the ongoing discussion there. But I can move the discussion to that
>> >> >>>> existing thread if that is preferred.
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Hi all,
>> >> >>>>>
>> >> >>>>> We are thinking about how to align with the Flink community and
>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I
>> >> put some
>> >> >>>>> context in this google doc. Would love to get hear your thoughts on
>> >> this.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >> >>>>>
>> >> >>>>> Thanks,
>> >> >>>>> Steven
>> >> >>>>>
>> >> >>>>
>> >>
>> >

Re: Source alignment for Iceberg

Posted by Thomas Weise <th...@apache.org>.
It seems that the iceberg source benefits from performing the
alignment in the enumerator and holding back the splits until they
actually can be processed. That is probably true for any similar
source that assigns work in smaller increments as centralizing the
"ready to process" decision in the enumerator allows for further
optimizations. Maybe there is an opportunity to come up with building
blocks for enumerators that benefit similar sources? It's probably
less compelling to sacrifice that for what appears to be limited reuse
of reader functionality?

Thomas


On Thu, May 5, 2022 at 12:55 PM Piotr Nowojski <pn...@apache.org> wrote:
>
> Hi Steven,
>
> Ok, thanks for the clarification. I'm not sure how much could be leveraged? Maybe just re-using the watermark alignment configuration? Please correct me if I'm wrong, but I think for the sole purpose of this use case, I don't see a good motivation behind expanding our APIs. Clearly this feature can be implemented already (you did it in the Iceberg connector after all).
>
> > As another example, if there are some fast readers whose splits are always throttled, while the other slow readers
> > are struggling to keep up with the rest of the splits, the split enumerator may decide to reassign the slow
> > splits so all the readers have something to read. This would need the SplitEnumerator to be aware of the
> > watermark progress on each reader. So it seems useful to expose the WatermarkAlignmentEvent information to the
> > SplitEnumerator as well.
>
> It seems like a valid potential use case. But do we have a good enough motivation to work on it right now?
>
> Piotrek
>
> czw., 5 maj 2022 o 16:21 Steven Wu <st...@gmail.com> napisał(a):
>>
>> Piotr,
>>
>> With FLIP-27, Iceberg source already implemented alignment by tracking
>> watermark and holding back split assignment when necessary.
>>
>> The purpose of this discussion is to see if Iceberg source can leverage
>> some of the watermark alignment work from Flink framework.
>>
>> Thanks,
>> Steven
>>
>> On Thu, May 5, 2022 at 1:10 AM Piotr Nowojski <pn...@apache.org> wrote:
>>
>> > Ok, I see. Thanks to both of you for the explanation.
>> >
>> > Do we need changes to Apache Flink for this feature? Can it be implemented
>> > in the Sources without changes in the framework? I presume source can
>> > access min/max watermark from the split, so as long as it also knows
>> > exactly which splits have finished, it would know which splits to hold back.
>> >
>> > Best,
>> > Piotrek
>> >
>> > śr., 4 maj 2022 o 20:03 Steven Wu <st...@gmail.com> napisał(a):
>> >
>> >> Piotr, thanks a lot for your feedback.
>> >>
>> >> > I can see this being an issue if the existence of too many blocked
>> >> splits is occupying too many resources.
>> >>
>> >> This is not desirable. Eagerly assigning many splits to a reader can
>> >> defeat the benefits of pull based dynamic split assignments. Iceberg
>> >> readers request one split at a time upon start or completion of a split.
>> >> Dynamic split assignment is better for work sharing/stealing as Becket
>> >> mentioned. Limiting number of active splits can be handled by the FLIP-27
>> >> Iceberg source and is somewhat orthogonal to watermark alignment.
>> >>
>> >> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
>> >> the watermark alignment and block the splits that are too much into the
>> >> future?
>> >>
>> >> The enumerator just assigns the next split to the requesting reader
>> >> instead of holding back the split assignment. Let the reader handle the
>> >> pause (if the file split requires alignment wait).  This strategy might
>> >> work and leverage more from the framework.
>> >>
>> >> We probably need the following to make this work
>> >> * extract watermark/timestamp only at the completion of a split (not at
>> >> record level). Because records in a file aren't probably not sorted by the
>> >> timestamp field, the pause or watermark advancement is probably better done
>> >> at file level.
>> >> * source readers checkpoint the watermark. otherwise, upon restart
>> >> readers won't be able to determine the local watermark and pause for
>> >> alignment. We don't want to emit records upon restart due to unknown
>> >> watermark info.
>> >>
>> >> All,
>> >>
>> >> Any opinion on different timestamp for source alignment (vs Flink
>> >> application watermark)? For Iceberg source, we might want to enforce
>> >> alignment on kafka timestamp but Flink application watermark may use event
>> >> time field from payload.
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
>> >>
>> >>> Hey Piotr,
>> >>>
>> >>> I think the mechanism FLIP-182 provided is a reasonable default one,
>> >>> which
>> >>> ensures the watermarks are only drifted by an upper bound. However,
>> >>> admittedly there are also other strategies for different purposes.
>> >>>
>> >>> In the Iceberg case, I am not sure if a static strictly allowed watermark
>> >>> drift is desired. The source might just want to finish reading the
>> >>> assigned
>> >>> splits as fast as possible. And it is OK to have a drift of "one split",
>> >>> instead of a fixed time period.
>> >>>
>> >>> As another example, if there are some fast readers whose splits are
>> >>> always
>> >>> throttled, while the other slow readers are struggling to keep up with
>> >>> the
>> >>> rest of the splits, the split enumerator may decide to reassign the slow
>> >>> splits so all the readers have something to read. This would need the
>> >>> SplitEnumerator to be aware of the watermark progress on each reader. So
>> >>> it
>> >>> seems useful to expose the WatermarkAlignmentEvent information to the
>> >>> SplitEnumerator as well.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Jiangjie (Becket) Qin
>> >>>
>> >>>
>> >>>
>> >>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
>> >>> wrote:
>> >>>
>> >>> > Hi Steven,
>> >>> >
>> >>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>> >>> emit
>> >>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
>> >>> > block the splits that are too much into the future? I can see this
>> >>> being an
>> >>> > issue if the existence of too many blocked splits is occupying too many
>> >>> > resources.
>> >>> >
>> >>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would
>> >>> have to
>> >>> > decide on some basis how many and which splits to assign in what
>> >>> order. But
>> >>> > in that case I'm not sure how much you could use from FLIP-182 and
>> >>> > FLIP-217. They seem somehow orthogonal to me, operating on different
>> >>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> >>> already
>> >>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
>> >>> and
>> >>> > take care of only the problem to limit the number of parallel active
>> >>> > splits. And here I'm not sure if it would be worth generalising a
>> >>> solution
>> >>> > across different connectors.
>> >>> >
>> >>> > Regarding the global watermark, I made a related comment sometime ago
>> >>> > about it [1]. It sounds to me like you also need to solve this problem,
>> >>> > otherwise Iceberg users will encounter late records in case of some
>> >>> race
>> >>> > conditions between assigning new splits and completions of older.
>> >>> >
>> >>> > Best,
>> >>> > Piotrek
>> >>> >
>> >>> > [1]
>> >>> >
>> >>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >>> >
>> >>> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>> >>> >
>> >>> >> add dev@ group to the thread as Thomas suggested
>> >>> >>
>> >>> >> Arvid,
>> >>> >>
>> >>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
>> >>> FLIP-180
>> >>> >> (idleness) can happen to Iceberg source alignment, as readers can be
>> >>> >> temporarily starved due to the holdback by the enumerator when
>> >>> assigning
>> >>> >> new splits upon request.
>> >>> >>
>> >>> >> Totally agree that we should decouple this discussion with the
>> >>> FLIP-217,
>> >>> >> which addresses the split level watermark alignment problem as a
>> >>> follow-up
>> >>> >> of FLIP-182
>> >>> >>
>> >>> >> Becket,
>> >>> >>
>> >>> >> Yes, currently Iceberg source implemented the alignment leveraging the
>> >>> >> dynamic split assignment from FLIP-27 design. Basically, the
>> >>> enumerator
>> >>> >> can
>> >>> >> hold back split assignments to readers when necessary. Everything are
>> >>> >> centralized in the enumerator: (1) watermark extraction and
>> >>> aggregation
>> >>> >> (2)
>> >>> >> alignment decision and execution
>> >>> >>
>> >>> >> The motivation of this discussion is to see if Iceberg source can
>> >>> leverage
>> >>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
>> >>> >> framework. E.g., as mentioned in the doc, Iceberg source can
>> >>> potentially
>> >>> >> leverage the FLIP-182 framework to do the watermark extraction and
>> >>> >> aggregation. For the alignment decision and execution, we can keep
>> >>> them in
>> >>> >> the centralized enumerator.
>> >>> >>
>> >>> >> Thanks,
>> >>> >> Steven
>> >>> >>
>> >>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
>> >>> wrote:
>> >>> >>
>> >>> >> > Hi Steven,
>> >>> >> >
>> >>> >> > Thanks for pulling me into this thread. I think the timestamp
>> >>> >> > alignment use case here is a good example of what FLIP-27 was
>> >>> designed
>> >>> >> for.
>> >>> >> >
>> >>> >> > Technically speaking, Iceberg source can already implement the
>> >>> timestamp
>> >>> >> > alignment in the Flink new source even without FLIP-182. However, I
>> >>> >> > understand the rationale here because timestamp alignment is also
>> >>> >> trying to
>> >>> >> > orchestrate the consumption of splits. However, it looks like
>> >>> FLIP-182
>> >>> >> was
>> >>> >> > not designed in a way that it can be easily extended for other use
>> >>> >> cases.
>> >>> >> > It may probably worth thinking of a more general mechanism to
>> >>> answer the
>> >>> >> > following questions:
>> >>> >> >
>> >>> >> > 1. What information whose source of truth is the Flink framework
>> >>> should
>> >>> >> be
>> >>> >> > exposed to the SplitEnumerator and SourceReader? And how?
>> >>> >> > 2. What control actions in the Flink framework are worth exposing
>> >>> to the
>> >>> >> > SplitEnumerators and SourceReaders? And how?
>> >>> >> >
>> >>> >> > In the context of timestamp alignment, the first question is more
>> >>> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> >>> >> > handling logic in the SourceCoordinator, should we expose this to
>> >>> the
>> >>> >> > SplitEnumerator? So basically there will be some information, such
>> >>> as
>> >>> >> > subtask local watermark, whose source of truth is Flink runtime, but
>> >>> >> useful
>> >>> >> > to the user provided pluggables.
>> >>> >> >
>> >>> >> > I think there are a few control flow patterns to make a complete
>> >>> design:
>> >>> >> >
>> >>> >> > a. Framework space information (e.g. watermark) --> User space
>> >>> >> Pluggables
>> >>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>> >>> >> split).
>> >>> >> > b. Framework space information (e.g. task failure) --> User space
>> >>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g.
>> >>> exit
>> >>> >> > the job)
>> >>> >> > c. User space information (e.g. a custom workload metric) --> User
>> >>> space
>> >>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> >>> rebalance
>> >>> >> > the workload across the source readers).
>> >>> >> > d. Use space information (e.g. a custom stopping event in the
>> >>> stream)
>> >>> >> -->
>> >>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
>> >>> actions
>> >>> >> > (e.g. stop the job).
>> >>> >> >
>> >>> >> > So basically for any user provided pluggables, the input
>> >>> information may
>> >>> >> > either come from another user provided logic or from the framework,
>> >>> and
>> >>> >> > after receiving that information, the pluggable may either want the
>> >>> >> > framework or another pluggable to take an action. So this gives the
>> >>> >> above 4
>> >>> >> > combinations.
>> >>> >> >
>> >>> >> > In our case, when the pluggables are SplitEnumerator and
>> >>> SourceReader,
>> >>> >> the
>> >>> >> > control flows that only involve user space actions are fully
>> >>> supported.
>> >>> >> But
>> >>> >> > it seems that when it comes to control flows involving framework
>> >>> space
>> >>> >> > information, some of the information has not been exposed to the
>> >>> >> pluggable,
>> >>> >> > and some framework actions might also be missing.
>> >>> >> >
>> >>> >> > Thanks,
>> >>> >> >
>> >>> >> > Jiangjie (Becket) Qin
>> >>> >> >
>> >>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> >>> wrote:
>> >>> >> >
>> >>> >> >> Hi folks,
>> >>> >> >>
>> >>> >> >> quick input from my side. I think this is from the implementation
>> >>> >> >> perspective what Piotr and I had in mind for a global min watermark
>> >>> >> that
>> >>> >> >> helps in idleness cases. See also point 3 in
>> >>> >> >>
>> >>> >>
>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >>> >> >> .
>> >>> >> >>
>> >>> >> >> Basically, we would like to empower source enumerators to
>> >>> determine the
>> >>> >> >> global min watermark for all source readers factoring in even
>> >>> future
>> >>> >> >> splits. Not all sources can supply that information (think of a
>> >>> general
>> >>> >> >> file source) but most should be able to. Basically, Flink should
>> >>> know
>> >>> >> for a
>> >>> >> >> given source at a given point in time what the min watermark
>> >>> across all
>> >>> >> >> source subtasks is.
>> >>> >> >>
>> >>> >> >> Here is some background:
>> >>> >> >> In the context of idleness, we can deterministically advance the
>> >>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >>> >> sources
>> >>> >> >> to switch to idleness and thus allow watermarks to increase in
>> >>> cases
>> >>> >> where
>> >>> >> >> fewer splits than source tasks are available. However, for sources
>> >>> with
>> >>> >> >> dynamic split discovery that actually yields incorrect results.
>> >>> Think
>> >>> >> of a
>> >>> >> >> Kinesis consumer where a shard is split. Then a previously idle
>> >>> source
>> >>> >> >> subtask may receive a new split with time t0 as the lowest
>> >>> timestamp.
>> >>> >> Since
>> >>> >> >> the source subtask did not participate in the global watermark
>> >>> >> generation
>> >>> >> >> (because it was idle), the previously emitted watermark may be
>> >>> past t0
>> >>> >> and
>> >>> >> >> thus results in late records potentially being discarded. A rerun
>> >>> of
>> >>> >> the
>> >>> >> >> same pipeline on historic data would not render the source subtask
>> >>> >> idle and
>> >>> >> >> not result in late records. The solution was to not render source
>> >>> >> subtasks
>> >>> >> >> automatically idle by the framework if there are no spits. That
>> >>> leads
>> >>> >> to
>> >>> >> >> confusion for Kafka users with static topic subscription where
>> >>> #splits
>> >>> >> <
>> >>> >> >> #parallelism stalls pipelines because the watermark is not
>> >>> advancing.
>> >>> >> Here,
>> >>> >> >> your sketched solution can be transferred to KafkaSource to let
>> >>> Flink
>> >>> >> know
>> >>> >> >> that min global watermark on a static assignment is determined by
>> >>> the
>> >>> >> >> slowest partition. Hence, all idle readers emit that min global
>> >>> >> watermark
>> >>> >> >> and the user sees progress.
>> >>> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd
>> >>> go
>> >>> >> >> with another FLIP as the goal is quite different even though the
>> >>> >> >> implementation overlaps.
>> >>> >> >>
>> >>> >> >> Now Iceberg seems to use the same information to actually pause the
>> >>> >> >> consumption of files and create some kind of orderness guarantees
>> >>> as
>> >>> >> far as
>> >>> >> >> I understood. This probably can be applied to any source with
>> >>> dynamic
>> >>> >> split
>> >>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
>> >>> >> appreciate
>> >>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >>> >> FLIP-182 is
>> >>> >> >> to pause readers while consuming a split, while your approach
>> >>> pauses
>> >>> >> >> readers before processing another split. So it feels more closely
>> >>> >> related
>> >>> >> >> to the global min watermark - so it could either be part of that
>> >>> FLIP
>> >>> >> or a
>> >>> >> >> FLIP of its own. Afaik API changes should actually happen only on
>> >>> the
>> >>> >> >> enumerator side both for your ideas and for global min watermark.
>> >>> >> >>
>> >>> >> >> Best,
>> >>> >> >>
>> >>> >> >> Arvid
>> >>> >> >>
>> >>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
>> >>> wrote:
>> >>> >> >>
>> >>> >> >>> Hi Steven,
>> >>> >> >>>
>> >>> >> >>> Would it be better to bring this as a separate thread related to
>> >>> >> Iceberg
>> >>> >> >>> source to the dev@ list? I think this could benefit from broader
>> >>> >> input?
>> >>> >> >>>
>> >>> >> >>> Thanks
>> >>> >> >>>
>> >>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>> >>> >> wrote:
>> >>> >> >>>
>> >>> >> >>>> + Becket and Sebastian
>> >>> >> >>>>
>> >>> >> >>>> It is also related to the split level watermark alignment
>> >>> discussion
>> >>> >> >>>> thread. Because it is already very long, I don't want to further
>> >>> >> complicate
>> >>> >> >>>> the ongoing discussion there. But I can move the discussion to
>> >>> that
>> >>> >> >>>> existing thread if that is preferred.
>> >>> >> >>>>
>> >>> >> >>>>
>> >>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <stevenz3wu@gmail.com
>> >>> >
>> >>> >> >>>> wrote:
>> >>> >> >>>>
>> >>> >> >>>>> Hi all,
>> >>> >> >>>>>
>> >>> >> >>>>> We are thinking about how to align with the Flink community and
>> >>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>> >>> source. I
>> >>> >> put some
>> >>> >> >>>>> context in this google doc. Would love to get hear your
>> >>> thoughts on
>> >>> >> this.
>> >>> >> >>>>>
>> >>> >> >>>>>
>> >>> >> >>>>>
>> >>> >>
>> >>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >>> >> >>>>>
>> >>> >> >>>>> Thanks,
>> >>> >> >>>>> Steven
>> >>> >> >>>>>
>> >>> >> >>>>
>> >>> >>
>> >>> >
>> >>>
>> >>

Re: Source alignment for Iceberg

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Steven,

Ok, thanks for the clarification. I'm not sure how much could be leveraged?
Maybe just re-using the watermark alignment configuration? Please correct
me if I'm wrong, but I think for the sole purpose of this use case, I don't
see a good motivation behind expanding our APIs. Clearly this feature can
be implemented already (you did it in the Iceberg connector after all).

> As another example, if there are some fast readers whose splits are
always throttled, while the other slow readers
> are struggling to keep up with the rest of the splits, the split
enumerator may decide to reassign the slow
> splits so all the readers have something to read. This would need the
SplitEnumerator to be aware of the
> watermark progress on each reader. So it seems useful to expose the
WatermarkAlignmentEvent information to the
> SplitEnumerator as well.

It seems like a valid potential use case. But do we have a good enough
motivation to work on it right now?

Piotrek

czw., 5 maj 2022 o 16:21 Steven Wu <st...@gmail.com> napisał(a):

> Piotr,
>
> With FLIP-27, Iceberg source already implemented alignment by tracking
> watermark and holding back split assignment when necessary.
>
> The purpose of this discussion is to see if Iceberg source can leverage
> some of the watermark alignment work from Flink framework.
>
> Thanks,
> Steven
>
> On Thu, May 5, 2022 at 1:10 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
> > Ok, I see. Thanks to both of you for the explanation.
> >
> > Do we need changes to Apache Flink for this feature? Can it be
> implemented
> > in the Sources without changes in the framework? I presume source can
> > access min/max watermark from the split, so as long as it also knows
> > exactly which splits have finished, it would know which splits to hold
> back.
> >
> > Best,
> > Piotrek
> >
> > śr., 4 maj 2022 o 20:03 Steven Wu <st...@gmail.com> napisał(a):
> >
> >> Piotr, thanks a lot for your feedback.
> >>
> >> > I can see this being an issue if the existence of too many blocked
> >> splits is occupying too many resources.
> >>
> >> This is not desirable. Eagerly assigning many splits to a reader can
> >> defeat the benefits of pull based dynamic split assignments. Iceberg
> >> readers request one split at a time upon start or completion of a split.
> >> Dynamic split assignment is better for work sharing/stealing as Becket
> >> mentioned. Limiting number of active splits can be handled by the
> FLIP-27
> >> Iceberg source and is somewhat orthogonal to watermark alignment.
> >>
> >> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
> >> the watermark alignment and block the splits that are too much into the
> >> future?
> >>
> >> The enumerator just assigns the next split to the requesting reader
> >> instead of holding back the split assignment. Let the reader handle the
> >> pause (if the file split requires alignment wait).  This strategy might
> >> work and leverage more from the framework.
> >>
> >> We probably need the following to make this work
> >> * extract watermark/timestamp only at the completion of a split (not at
> >> record level). Because records in a file aren't probably not sorted by
> the
> >> timestamp field, the pause or watermark advancement is probably better
> done
> >> at file level.
> >> * source readers checkpoint the watermark. otherwise, upon restart
> >> readers won't be able to determine the local watermark and pause for
> >> alignment. We don't want to emit records upon restart due to unknown
> >> watermark info.
> >>
> >> All,
> >>
> >> Any opinion on different timestamp for source alignment (vs Flink
> >> application watermark)? For Iceberg source, we might want to enforce
> >> alignment on kafka timestamp but Flink application watermark may use
> event
> >> time field from payload.
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
> >>
> >>> Hey Piotr,
> >>>
> >>> I think the mechanism FLIP-182 provided is a reasonable default one,
> >>> which
> >>> ensures the watermarks are only drifted by an upper bound. However,
> >>> admittedly there are also other strategies for different purposes.
> >>>
> >>> In the Iceberg case, I am not sure if a static strictly allowed
> watermark
> >>> drift is desired. The source might just want to finish reading the
> >>> assigned
> >>> splits as fast as possible. And it is OK to have a drift of "one
> split",
> >>> instead of a fixed time period.
> >>>
> >>> As another example, if there are some fast readers whose splits are
> >>> always
> >>> throttled, while the other slow readers are struggling to keep up with
> >>> the
> >>> rest of the splits, the split enumerator may decide to reassign the
> slow
> >>> splits so all the readers have something to read. This would need the
> >>> SplitEnumerator to be aware of the watermark progress on each reader.
> So
> >>> it
> >>> seems useful to expose the WatermarkAlignmentEvent information to the
> >>> SplitEnumerator as well.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>>
> >>>
> >>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
> >>> wrote:
> >>>
> >>> > Hi Steven,
> >>> >
> >>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
> >>> emit
> >>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment
> and
> >>> > block the splits that are too much into the future? I can see this
> >>> being an
> >>> > issue if the existence of too many blocked splits is occupying too
> many
> >>> > resources.
> >>> >
> >>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would
> >>> have to
> >>> > decide on some basis how many and which splits to assign in what
> >>> order. But
> >>> > in that case I'm not sure how much you could use from FLIP-182 and
> >>> > FLIP-217. They seem somehow orthogonal to me, operating on different
> >>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
> >>> already
> >>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
> >>> and
> >>> > take care of only the problem to limit the number of parallel active
> >>> > splits. And here I'm not sure if it would be worth generalising a
> >>> solution
> >>> > across different connectors.
> >>> >
> >>> > Regarding the global watermark, I made a related comment sometime ago
> >>> > about it [1]. It sounds to me like you also need to solve this
> problem,
> >>> > otherwise Iceberg users will encounter late records in case of some
> >>> race
> >>> > conditions between assigning new splits and completions of older.
> >>> >
> >>> > Best,
> >>> > Piotrek
> >>> >
> >>> > [1]
> >>> >
> >>>
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >>> >
> >>> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com>
> napisał(a):
> >>> >
> >>> >> add dev@ group to the thread as Thomas suggested
> >>> >>
> >>> >> Arvid,
> >>> >>
> >>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
> >>> FLIP-180
> >>> >> (idleness) can happen to Iceberg source alignment, as readers can be
> >>> >> temporarily starved due to the holdback by the enumerator when
> >>> assigning
> >>> >> new splits upon request.
> >>> >>
> >>> >> Totally agree that we should decouple this discussion with the
> >>> FLIP-217,
> >>> >> which addresses the split level watermark alignment problem as a
> >>> follow-up
> >>> >> of FLIP-182
> >>> >>
> >>> >> Becket,
> >>> >>
> >>> >> Yes, currently Iceberg source implemented the alignment leveraging
> the
> >>> >> dynamic split assignment from FLIP-27 design. Basically, the
> >>> enumerator
> >>> >> can
> >>> >> hold back split assignments to readers when necessary. Everything
> are
> >>> >> centralized in the enumerator: (1) watermark extraction and
> >>> aggregation
> >>> >> (2)
> >>> >> alignment decision and execution
> >>> >>
> >>> >> The motivation of this discussion is to see if Iceberg source can
> >>> leverage
> >>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
> >>> >> framework. E.g., as mentioned in the doc, Iceberg source can
> >>> potentially
> >>> >> leverage the FLIP-182 framework to do the watermark extraction and
> >>> >> aggregation. For the alignment decision and execution, we can keep
> >>> them in
> >>> >> the centralized enumerator.
> >>> >>
> >>> >> Thanks,
> >>> >> Steven
> >>> >>
> >>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
> >>> wrote:
> >>> >>
> >>> >> > Hi Steven,
> >>> >> >
> >>> >> > Thanks for pulling me into this thread. I think the timestamp
> >>> >> > alignment use case here is a good example of what FLIP-27 was
> >>> designed
> >>> >> for.
> >>> >> >
> >>> >> > Technically speaking, Iceberg source can already implement the
> >>> timestamp
> >>> >> > alignment in the Flink new source even without FLIP-182. However,
> I
> >>> >> > understand the rationale here because timestamp alignment is also
> >>> >> trying to
> >>> >> > orchestrate the consumption of splits. However, it looks like
> >>> FLIP-182
> >>> >> was
> >>> >> > not designed in a way that it can be easily extended for other use
> >>> >> cases.
> >>> >> > It may probably worth thinking of a more general mechanism to
> >>> answer the
> >>> >> > following questions:
> >>> >> >
> >>> >> > 1. What information whose source of truth is the Flink framework
> >>> should
> >>> >> be
> >>> >> > exposed to the SplitEnumerator and SourceReader? And how?
> >>> >> > 2. What control actions in the Flink framework are worth exposing
> >>> to the
> >>> >> > SplitEnumerators and SourceReaders? And how?
> >>> >> >
> >>> >> > In the context of timestamp alignment, the first question is more
> >>> >> > relevant. For example, instead of hardcode the
> ReportWatermarkEvent
> >>> >> > handling logic in the SourceCoordinator, should we expose this to
> >>> the
> >>> >> > SplitEnumerator? So basically there will be some information, such
> >>> as
> >>> >> > subtask local watermark, whose source of truth is Flink runtime,
> but
> >>> >> useful
> >>> >> > to the user provided pluggables.
> >>> >> >
> >>> >> > I think there are a few control flow patterns to make a complete
> >>> design:
> >>> >> >
> >>> >> > a. Framework space information (e.g. watermark) --> User space
> >>> >> Pluggables
> >>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading
> a
> >>> >> split).
> >>> >> > b. Framework space information (e.g. task failure) --> User space
> >>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g.
> >>> exit
> >>> >> > the job)
> >>> >> > c. User space information (e.g. a custom workload metric) --> User
> >>> space
> >>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> >>> rebalance
> >>> >> > the workload across the source readers).
> >>> >> > d. Use space information (e.g. a custom stopping event in the
> >>> stream)
> >>> >> -->
> >>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
> >>> actions
> >>> >> > (e.g. stop the job).
> >>> >> >
> >>> >> > So basically for any user provided pluggables, the input
> >>> information may
> >>> >> > either come from another user provided logic or from the
> framework,
> >>> and
> >>> >> > after receiving that information, the pluggable may either want
> the
> >>> >> > framework or another pluggable to take an action. So this gives
> the
> >>> >> above 4
> >>> >> > combinations.
> >>> >> >
> >>> >> > In our case, when the pluggables are SplitEnumerator and
> >>> SourceReader,
> >>> >> the
> >>> >> > control flows that only involve user space actions are fully
> >>> supported.
> >>> >> But
> >>> >> > it seems that when it comes to control flows involving framework
> >>> space
> >>> >> > information, some of the information has not been exposed to the
> >>> >> pluggable,
> >>> >> > and some framework actions might also be missing.
> >>> >> >
> >>> >> > Thanks,
> >>> >> >
> >>> >> > Jiangjie (Becket) Qin
> >>> >> >
> >>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
> >>> wrote:
> >>> >> >
> >>> >> >> Hi folks,
> >>> >> >>
> >>> >> >> quick input from my side. I think this is from the implementation
> >>> >> >> perspective what Piotr and I had in mind for a global min
> watermark
> >>> >> that
> >>> >> >> helps in idleness cases. See also point 3 in
> >>> >> >>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >>> >> >> .
> >>> >> >>
> >>> >> >> Basically, we would like to empower source enumerators to
> >>> determine the
> >>> >> >> global min watermark for all source readers factoring in even
> >>> future
> >>> >> >> splits. Not all sources can supply that information (think of a
> >>> general
> >>> >> >> file source) but most should be able to. Basically, Flink should
> >>> know
> >>> >> for a
> >>> >> >> given source at a given point in time what the min watermark
> >>> across all
> >>> >> >> source subtasks is.
> >>> >> >>
> >>> >> >> Here is some background:
> >>> >> >> In the context of idleness, we can deterministically advance the
> >>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> >>> >> sources
> >>> >> >> to switch to idleness and thus allow watermarks to increase in
> >>> cases
> >>> >> where
> >>> >> >> fewer splits than source tasks are available. However, for
> sources
> >>> with
> >>> >> >> dynamic split discovery that actually yields incorrect results.
> >>> Think
> >>> >> of a
> >>> >> >> Kinesis consumer where a shard is split. Then a previously idle
> >>> source
> >>> >> >> subtask may receive a new split with time t0 as the lowest
> >>> timestamp.
> >>> >> Since
> >>> >> >> the source subtask did not participate in the global watermark
> >>> >> generation
> >>> >> >> (because it was idle), the previously emitted watermark may be
> >>> past t0
> >>> >> and
> >>> >> >> thus results in late records potentially being discarded. A rerun
> >>> of
> >>> >> the
> >>> >> >> same pipeline on historic data would not render the source
> subtask
> >>> >> idle and
> >>> >> >> not result in late records. The solution was to not render source
> >>> >> subtasks
> >>> >> >> automatically idle by the framework if there are no spits. That
> >>> leads
> >>> >> to
> >>> >> >> confusion for Kafka users with static topic subscription where
> >>> #splits
> >>> >> <
> >>> >> >> #parallelism stalls pipelines because the watermark is not
> >>> advancing.
> >>> >> Here,
> >>> >> >> your sketched solution can be transferred to KafkaSource to let
> >>> Flink
> >>> >> know
> >>> >> >> that min global watermark on a static assignment is determined by
> >>> the
> >>> >> >> slowest partition. Hence, all idle readers emit that min global
> >>> >> watermark
> >>> >> >> and the user sees progress.
> >>> >> >> This whole idea is related to FLIP-182 watermark alignment but
> I'd
> >>> go
> >>> >> >> with another FLIP as the goal is quite different even though the
> >>> >> >> implementation overlaps.
> >>> >> >>
> >>> >> >> Now Iceberg seems to use the same information to actually pause
> the
> >>> >> >> consumption of files and create some kind of orderness guarantees
> >>> as
> >>> >> far as
> >>> >> >> I understood. This probably can be applied to any source with
> >>> dynamic
> >>> >> split
> >>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
> >>> >> appreciate
> >>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
> >>> >> FLIP-182 is
> >>> >> >> to pause readers while consuming a split, while your approach
> >>> pauses
> >>> >> >> readers before processing another split. So it feels more closely
> >>> >> related
> >>> >> >> to the global min watermark - so it could either be part of that
> >>> FLIP
> >>> >> or a
> >>> >> >> FLIP of its own. Afaik API changes should actually happen only on
> >>> the
> >>> >> >> enumerator side both for your ideas and for global min watermark.
> >>> >> >>
> >>> >> >> Best,
> >>> >> >>
> >>> >> >> Arvid
> >>> >> >>
> >>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
> >>> wrote:
> >>> >> >>
> >>> >> >>> Hi Steven,
> >>> >> >>>
> >>> >> >>> Would it be better to bring this as a separate thread related to
> >>> >> Iceberg
> >>> >> >>> source to the dev@ list? I think this could benefit from
> broader
> >>> >> input?
> >>> >> >>>
> >>> >> >>> Thanks
> >>> >> >>>
> >>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <stevenz3wu@gmail.com
> >
> >>> >> wrote:
> >>> >> >>>
> >>> >> >>>> + Becket and Sebastian
> >>> >> >>>>
> >>> >> >>>> It is also related to the split level watermark alignment
> >>> discussion
> >>> >> >>>> thread. Because it is already very long, I don't want to
> further
> >>> >> complicate
> >>> >> >>>> the ongoing discussion there. But I can move the discussion to
> >>> that
> >>> >> >>>> existing thread if that is preferred.
> >>> >> >>>>
> >>> >> >>>>
> >>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <
> stevenz3wu@gmail.com
> >>> >
> >>> >> >>>> wrote:
> >>> >> >>>>
> >>> >> >>>>> Hi all,
> >>> >> >>>>>
> >>> >> >>>>> We are thinking about how to align with the Flink community
> and
> >>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg
> >>> source. I
> >>> >> put some
> >>> >> >>>>> context in this google doc. Would love to get hear your
> >>> thoughts on
> >>> >> this.
> >>> >> >>>>>
> >>> >> >>>>>
> >>> >> >>>>>
> >>> >>
> >>>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >>> >> >>>>>
> >>> >> >>>>> Thanks,
> >>> >> >>>>> Steven
> >>> >> >>>>>
> >>> >> >>>>
> >>> >>
> >>> >
> >>>
> >>
>

Re: Source alignment for Iceberg

Posted by Steven Wu <st...@gmail.com>.
Piotr,

With FLIP-27, Iceberg source already implemented alignment by tracking
watermark and holding back split assignment when necessary.

The purpose of this discussion is to see if Iceberg source can leverage
some of the watermark alignment work from Flink framework.

Thanks,
Steven

On Thu, May 5, 2022 at 1:10 AM Piotr Nowojski <pn...@apache.org> wrote:

> Ok, I see. Thanks to both of you for the explanation.
>
> Do we need changes to Apache Flink for this feature? Can it be implemented
> in the Sources without changes in the framework? I presume source can
> access min/max watermark from the split, so as long as it also knows
> exactly which splits have finished, it would know which splits to hold back.
>
> Best,
> Piotrek
>
> śr., 4 maj 2022 o 20:03 Steven Wu <st...@gmail.com> napisał(a):
>
>> Piotr, thanks a lot for your feedback.
>>
>> > I can see this being an issue if the existence of too many blocked
>> splits is occupying too many resources.
>>
>> This is not desirable. Eagerly assigning many splits to a reader can
>> defeat the benefits of pull based dynamic split assignments. Iceberg
>> readers request one split at a time upon start or completion of a split.
>> Dynamic split assignment is better for work sharing/stealing as Becket
>> mentioned. Limiting number of active splits can be handled by the FLIP-27
>> Iceberg source and is somewhat orthogonal to watermark alignment.
>>
>> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
>> the watermark alignment and block the splits that are too much into the
>> future?
>>
>> The enumerator just assigns the next split to the requesting reader
>> instead of holding back the split assignment. Let the reader handle the
>> pause (if the file split requires alignment wait).  This strategy might
>> work and leverage more from the framework.
>>
>> We probably need the following to make this work
>> * extract watermark/timestamp only at the completion of a split (not at
>> record level). Because records in a file aren't probably not sorted by the
>> timestamp field, the pause or watermark advancement is probably better done
>> at file level.
>> * source readers checkpoint the watermark. otherwise, upon restart
>> readers won't be able to determine the local watermark and pause for
>> alignment. We don't want to emit records upon restart due to unknown
>> watermark info.
>>
>> All,
>>
>> Any opinion on different timestamp for source alignment (vs Flink
>> application watermark)? For Iceberg source, we might want to enforce
>> alignment on kafka timestamp but Flink application watermark may use event
>> time field from payload.
>>
>> Thanks,
>> Steven
>>
>> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
>>
>>> Hey Piotr,
>>>
>>> I think the mechanism FLIP-182 provided is a reasonable default one,
>>> which
>>> ensures the watermarks are only drifted by an upper bound. However,
>>> admittedly there are also other strategies for different purposes.
>>>
>>> In the Iceberg case, I am not sure if a static strictly allowed watermark
>>> drift is desired. The source might just want to finish reading the
>>> assigned
>>> splits as fast as possible. And it is OK to have a drift of "one split",
>>> instead of a fixed time period.
>>>
>>> As another example, if there are some fast readers whose splits are
>>> always
>>> throttled, while the other slow readers are struggling to keep up with
>>> the
>>> rest of the splits, the split enumerator may decide to reassign the slow
>>> splits so all the readers have something to read. This would need the
>>> SplitEnumerator to be aware of the watermark progress on each reader. So
>>> it
>>> seems useful to expose the WatermarkAlignmentEvent information to the
>>> SplitEnumerator as well.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>>
>>>
>>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>> > Hi Steven,
>>> >
>>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just
>>> emit
>>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
>>> > block the splits that are too much into the future? I can see this
>>> being an
>>> > issue if the existence of too many blocked splits is occupying too many
>>> > resources.
>>> >
>>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would
>>> have to
>>> > decide on some basis how many and which splits to assign in what
>>> order. But
>>> > in that case I'm not sure how much you could use from FLIP-182 and
>>> > FLIP-217. They seem somehow orthogonal to me, operating on different
>>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
>>> already
>>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
>>> and
>>> > take care of only the problem to limit the number of parallel active
>>> > splits. And here I'm not sure if it would be worth generalising a
>>> solution
>>> > across different connectors.
>>> >
>>> > Regarding the global watermark, I made a related comment sometime ago
>>> > about it [1]. It sounds to me like you also need to solve this problem,
>>> > otherwise Iceberg users will encounter late records in case of some
>>> race
>>> > conditions between assigning new splits and completions of older.
>>> >
>>> > Best,
>>> > Piotrek
>>> >
>>> > [1]
>>> >
>>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>>> >
>>> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>>> >
>>> >> add dev@ group to the thread as Thomas suggested
>>> >>
>>> >> Arvid,
>>> >>
>>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
>>> FLIP-180
>>> >> (idleness) can happen to Iceberg source alignment, as readers can be
>>> >> temporarily starved due to the holdback by the enumerator when
>>> assigning
>>> >> new splits upon request.
>>> >>
>>> >> Totally agree that we should decouple this discussion with the
>>> FLIP-217,
>>> >> which addresses the split level watermark alignment problem as a
>>> follow-up
>>> >> of FLIP-182
>>> >>
>>> >> Becket,
>>> >>
>>> >> Yes, currently Iceberg source implemented the alignment leveraging the
>>> >> dynamic split assignment from FLIP-27 design. Basically, the
>>> enumerator
>>> >> can
>>> >> hold back split assignments to readers when necessary. Everything are
>>> >> centralized in the enumerator: (1) watermark extraction and
>>> aggregation
>>> >> (2)
>>> >> alignment decision and execution
>>> >>
>>> >> The motivation of this discussion is to see if Iceberg source can
>>> leverage
>>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
>>> >> framework. E.g., as mentioned in the doc, Iceberg source can
>>> potentially
>>> >> leverage the FLIP-182 framework to do the watermark extraction and
>>> >> aggregation. For the alignment decision and execution, we can keep
>>> them in
>>> >> the centralized enumerator.
>>> >>
>>> >> Thanks,
>>> >> Steven
>>> >>
>>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
>>> wrote:
>>> >>
>>> >> > Hi Steven,
>>> >> >
>>> >> > Thanks for pulling me into this thread. I think the timestamp
>>> >> > alignment use case here is a good example of what FLIP-27 was
>>> designed
>>> >> for.
>>> >> >
>>> >> > Technically speaking, Iceberg source can already implement the
>>> timestamp
>>> >> > alignment in the Flink new source even without FLIP-182. However, I
>>> >> > understand the rationale here because timestamp alignment is also
>>> >> trying to
>>> >> > orchestrate the consumption of splits. However, it looks like
>>> FLIP-182
>>> >> was
>>> >> > not designed in a way that it can be easily extended for other use
>>> >> cases.
>>> >> > It may probably worth thinking of a more general mechanism to
>>> answer the
>>> >> > following questions:
>>> >> >
>>> >> > 1. What information whose source of truth is the Flink framework
>>> should
>>> >> be
>>> >> > exposed to the SplitEnumerator and SourceReader? And how?
>>> >> > 2. What control actions in the Flink framework are worth exposing
>>> to the
>>> >> > SplitEnumerators and SourceReaders? And how?
>>> >> >
>>> >> > In the context of timestamp alignment, the first question is more
>>> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>>> >> > handling logic in the SourceCoordinator, should we expose this to
>>> the
>>> >> > SplitEnumerator? So basically there will be some information, such
>>> as
>>> >> > subtask local watermark, whose source of truth is Flink runtime, but
>>> >> useful
>>> >> > to the user provided pluggables.
>>> >> >
>>> >> > I think there are a few control flow patterns to make a complete
>>> design:
>>> >> >
>>> >> > a. Framework space information (e.g. watermark) --> User space
>>> >> Pluggables
>>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>>> >> split).
>>> >> > b. Framework space information (e.g. task failure) --> User space
>>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g.
>>> exit
>>> >> > the job)
>>> >> > c. User space information (e.g. a custom workload metric) --> User
>>> space
>>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>>> rebalance
>>> >> > the workload across the source readers).
>>> >> > d. Use space information (e.g. a custom stopping event in the
>>> stream)
>>> >> -->
>>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
>>> actions
>>> >> > (e.g. stop the job).
>>> >> >
>>> >> > So basically for any user provided pluggables, the input
>>> information may
>>> >> > either come from another user provided logic or from the framework,
>>> and
>>> >> > after receiving that information, the pluggable may either want the
>>> >> > framework or another pluggable to take an action. So this gives the
>>> >> above 4
>>> >> > combinations.
>>> >> >
>>> >> > In our case, when the pluggables are SplitEnumerator and
>>> SourceReader,
>>> >> the
>>> >> > control flows that only involve user space actions are fully
>>> supported.
>>> >> But
>>> >> > it seems that when it comes to control flows involving framework
>>> space
>>> >> > information, some of the information has not been exposed to the
>>> >> pluggable,
>>> >> > and some framework actions might also be missing.
>>> >> >
>>> >> > Thanks,
>>> >> >
>>> >> > Jiangjie (Becket) Qin
>>> >> >
>>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>>> wrote:
>>> >> >
>>> >> >> Hi folks,
>>> >> >>
>>> >> >> quick input from my side. I think this is from the implementation
>>> >> >> perspective what Piotr and I had in mind for a global min watermark
>>> >> that
>>> >> >> helps in idleness cases. See also point 3 in
>>> >> >>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>>> >> >> .
>>> >> >>
>>> >> >> Basically, we would like to empower source enumerators to
>>> determine the
>>> >> >> global min watermark for all source readers factoring in even
>>> future
>>> >> >> splits. Not all sources can supply that information (think of a
>>> general
>>> >> >> file source) but most should be able to. Basically, Flink should
>>> know
>>> >> for a
>>> >> >> given source at a given point in time what the min watermark
>>> across all
>>> >> >> source subtasks is.
>>> >> >>
>>> >> >> Here is some background:
>>> >> >> In the context of idleness, we can deterministically advance the
>>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>>> >> sources
>>> >> >> to switch to idleness and thus allow watermarks to increase in
>>> cases
>>> >> where
>>> >> >> fewer splits than source tasks are available. However, for sources
>>> with
>>> >> >> dynamic split discovery that actually yields incorrect results.
>>> Think
>>> >> of a
>>> >> >> Kinesis consumer where a shard is split. Then a previously idle
>>> source
>>> >> >> subtask may receive a new split with time t0 as the lowest
>>> timestamp.
>>> >> Since
>>> >> >> the source subtask did not participate in the global watermark
>>> >> generation
>>> >> >> (because it was idle), the previously emitted watermark may be
>>> past t0
>>> >> and
>>> >> >> thus results in late records potentially being discarded. A rerun
>>> of
>>> >> the
>>> >> >> same pipeline on historic data would not render the source subtask
>>> >> idle and
>>> >> >> not result in late records. The solution was to not render source
>>> >> subtasks
>>> >> >> automatically idle by the framework if there are no spits. That
>>> leads
>>> >> to
>>> >> >> confusion for Kafka users with static topic subscription where
>>> #splits
>>> >> <
>>> >> >> #parallelism stalls pipelines because the watermark is not
>>> advancing.
>>> >> Here,
>>> >> >> your sketched solution can be transferred to KafkaSource to let
>>> Flink
>>> >> know
>>> >> >> that min global watermark on a static assignment is determined by
>>> the
>>> >> >> slowest partition. Hence, all idle readers emit that min global
>>> >> watermark
>>> >> >> and the user sees progress.
>>> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd
>>> go
>>> >> >> with another FLIP as the goal is quite different even though the
>>> >> >> implementation overlaps.
>>> >> >>
>>> >> >> Now Iceberg seems to use the same information to actually pause the
>>> >> >> consumption of files and create some kind of orderness guarantees
>>> as
>>> >> far as
>>> >> >> I understood. This probably can be applied to any source with
>>> dynamic
>>> >> split
>>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
>>> >> appreciate
>>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>>> >> FLIP-182 is
>>> >> >> to pause readers while consuming a split, while your approach
>>> pauses
>>> >> >> readers before processing another split. So it feels more closely
>>> >> related
>>> >> >> to the global min watermark - so it could either be part of that
>>> FLIP
>>> >> or a
>>> >> >> FLIP of its own. Afaik API changes should actually happen only on
>>> the
>>> >> >> enumerator side both for your ideas and for global min watermark.
>>> >> >>
>>> >> >> Best,
>>> >> >>
>>> >> >> Arvid
>>> >> >>
>>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
>>> wrote:
>>> >> >>
>>> >> >>> Hi Steven,
>>> >> >>>
>>> >> >>> Would it be better to bring this as a separate thread related to
>>> >> Iceberg
>>> >> >>> source to the dev@ list? I think this could benefit from broader
>>> >> input?
>>> >> >>>
>>> >> >>> Thanks
>>> >> >>>
>>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>>> >> wrote:
>>> >> >>>
>>> >> >>>> + Becket and Sebastian
>>> >> >>>>
>>> >> >>>> It is also related to the split level watermark alignment
>>> discussion
>>> >> >>>> thread. Because it is already very long, I don't want to further
>>> >> complicate
>>> >> >>>> the ongoing discussion there. But I can move the discussion to
>>> that
>>> >> >>>> existing thread if that is preferred.
>>> >> >>>>
>>> >> >>>>
>>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <stevenz3wu@gmail.com
>>> >
>>> >> >>>> wrote:
>>> >> >>>>
>>> >> >>>>> Hi all,
>>> >> >>>>>
>>> >> >>>>> We are thinking about how to align with the Flink community and
>>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg
>>> source. I
>>> >> put some
>>> >> >>>>> context in this google doc. Would love to get hear your
>>> thoughts on
>>> >> this.
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>>
>>> >>
>>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>>> >> >>>>>
>>> >> >>>>> Thanks,
>>> >> >>>>> Steven
>>> >> >>>>>
>>> >> >>>>
>>> >>
>>> >
>>>
>>

Re: Source alignment for Iceberg

Posted by Piotr Nowojski <pn...@apache.org>.
Ok, I see. Thanks to both of you for the explanation.

Do we need changes to Apache Flink for this feature? Can it be implemented
in the Sources without changes in the framework? I presume source can
access min/max watermark from the split, so as long as it also knows
exactly which splits have finished, it would know which splits to hold back.

Best,
Piotrek

śr., 4 maj 2022 o 20:03 Steven Wu <st...@gmail.com> napisał(a):

> Piotr, thanks a lot for your feedback.
>
> > I can see this being an issue if the existence of too many blocked
> splits is occupying too many resources.
>
> This is not desirable. Eagerly assigning many splits to a reader can
> defeat the benefits of pull based dynamic split assignments. Iceberg
> readers request one split at a time upon start or completion of a split.
> Dynamic split assignment is better for work sharing/stealing as Becket
> mentioned. Limiting number of active splits can be handled by the FLIP-27
> Iceberg source and is somewhat orthogonal to watermark alignment.
>
> > Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle
> the watermark alignment and block the splits that are too much into the
> future?
>
> The enumerator just assigns the next split to the requesting reader
> instead of holding back the split assignment. Let the reader handle the
> pause (if the file split requires alignment wait).  This strategy might
> work and leverage more from the framework.
>
> We probably need the following to make this work
> * extract watermark/timestamp only at the completion of a split (not at
> record level). Because records in a file aren't probably not sorted by the
> timestamp field, the pause or watermark advancement is probably better done
> at file level.
> * source readers checkpoint the watermark. otherwise, upon restart readers
> won't be able to determine the local watermark and pause for alignment. We
> don't want to emit records upon restart due to unknown watermark info.
>
> All,
>
> Any opinion on different timestamp for source alignment (vs Flink
> application watermark)? For Iceberg source, we might want to enforce
> alignment on kafka timestamp but Flink application watermark may use event
> time field from payload.
>
> Thanks,
> Steven
>
> On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:
>
>> Hey Piotr,
>>
>> I think the mechanism FLIP-182 provided is a reasonable default one, which
>> ensures the watermarks are only drifted by an upper bound. However,
>> admittedly there are also other strategies for different purposes.
>>
>> In the Iceberg case, I am not sure if a static strictly allowed watermark
>> drift is desired. The source might just want to finish reading the
>> assigned
>> splits as fast as possible. And it is OK to have a drift of "one split",
>> instead of a fixed time period.
>>
>> As another example, if there are some fast readers whose splits are always
>> throttled, while the other slow readers are struggling to keep up with the
>> rest of the splits, the split enumerator may decide to reassign the slow
>> splits so all the readers have something to read. This would need the
>> SplitEnumerator to be aware of the watermark progress on each reader. So
>> it
>> seems useful to expose the WatermarkAlignmentEvent information to the
>> SplitEnumerator as well.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>
>> > Hi Steven,
>> >
>> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
>> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
>> > block the splits that are too much into the future? I can see this
>> being an
>> > issue if the existence of too many blocked splits is occupying too many
>> > resources.
>> >
>> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have
>> to
>> > decide on some basis how many and which splits to assign in what order.
>> But
>> > in that case I'm not sure how much you could use from FLIP-182 and
>> > FLIP-217. They seem somehow orthogonal to me, operating on different
>> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
>> already
>> > been generated and assigned. You could leverage FLIP-182 and FLIP-217
>> and
>> > take care of only the problem to limit the number of parallel active
>> > splits. And here I'm not sure if it would be worth generalising a
>> solution
>> > across different connectors.
>> >
>> > Regarding the global watermark, I made a related comment sometime ago
>> > about it [1]. It sounds to me like you also need to solve this problem,
>> > otherwise Iceberg users will encounter late records in case of some race
>> > conditions between assigning new splits and completions of older.
>> >
>> > Best,
>> > Piotrek
>> >
>> > [1]
>> >
>> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>> >
>> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>> >
>> >> add dev@ group to the thread as Thomas suggested
>> >>
>> >> Arvid,
>> >>
>> >> The scenario 3 (Dynamic assignment + temporary no split) in the
>> FLIP-180
>> >> (idleness) can happen to Iceberg source alignment, as readers can be
>> >> temporarily starved due to the holdback by the enumerator when
>> assigning
>> >> new splits upon request.
>> >>
>> >> Totally agree that we should decouple this discussion with the
>> FLIP-217,
>> >> which addresses the split level watermark alignment problem as a
>> follow-up
>> >> of FLIP-182
>> >>
>> >> Becket,
>> >>
>> >> Yes, currently Iceberg source implemented the alignment leveraging the
>> >> dynamic split assignment from FLIP-27 design. Basically, the enumerator
>> >> can
>> >> hold back split assignments to readers when necessary. Everything are
>> >> centralized in the enumerator: (1) watermark extraction and aggregation
>> >> (2)
>> >> alignment decision and execution
>> >>
>> >> The motivation of this discussion is to see if Iceberg source can
>> leverage
>> >> some of the watermark alignment solutions (like FLIP-182) from Flink
>> >> framework. E.g., as mentioned in the doc, Iceberg source can
>> potentially
>> >> leverage the FLIP-182 framework to do the watermark extraction and
>> >> aggregation. For the alignment decision and execution, we can keep
>> them in
>> >> the centralized enumerator.
>> >>
>> >> Thanks,
>> >> Steven
>> >>
>> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
>> wrote:
>> >>
>> >> > Hi Steven,
>> >> >
>> >> > Thanks for pulling me into this thread. I think the timestamp
>> >> > alignment use case here is a good example of what FLIP-27 was
>> designed
>> >> for.
>> >> >
>> >> > Technically speaking, Iceberg source can already implement the
>> timestamp
>> >> > alignment in the Flink new source even without FLIP-182. However, I
>> >> > understand the rationale here because timestamp alignment is also
>> >> trying to
>> >> > orchestrate the consumption of splits. However, it looks like
>> FLIP-182
>> >> was
>> >> > not designed in a way that it can be easily extended for other use
>> >> cases.
>> >> > It may probably worth thinking of a more general mechanism to answer
>> the
>> >> > following questions:
>> >> >
>> >> > 1. What information whose source of truth is the Flink framework
>> should
>> >> be
>> >> > exposed to the SplitEnumerator and SourceReader? And how?
>> >> > 2. What control actions in the Flink framework are worth exposing to
>> the
>> >> > SplitEnumerators and SourceReaders? And how?
>> >> >
>> >> > In the context of timestamp alignment, the first question is more
>> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> >> > handling logic in the SourceCoordinator, should we expose this to the
>> >> > SplitEnumerator? So basically there will be some information, such as
>> >> > subtask local watermark, whose source of truth is Flink runtime, but
>> >> useful
>> >> > to the user provided pluggables.
>> >> >
>> >> > I think there are a few control flow patterns to make a complete
>> design:
>> >> >
>> >> > a. Framework space information (e.g. watermark) --> User space
>> >> Pluggables
>> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>> >> split).
>> >> > b. Framework space information (e.g. task failure) --> User space
>> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g.
>> exit
>> >> > the job)
>> >> > c. User space information (e.g. a custom workload metric) --> User
>> space
>> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
>> rebalance
>> >> > the workload across the source readers).
>> >> > d. Use space information (e.g. a custom stopping event in the stream)
>> >> -->
>> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
>> actions
>> >> > (e.g. stop the job).
>> >> >
>> >> > So basically for any user provided pluggables, the input information
>> may
>> >> > either come from another user provided logic or from the framework,
>> and
>> >> > after receiving that information, the pluggable may either want the
>> >> > framework or another pluggable to take an action. So this gives the
>> >> above 4
>> >> > combinations.
>> >> >
>> >> > In our case, when the pluggables are SplitEnumerator and
>> SourceReader,
>> >> the
>> >> > control flows that only involve user space actions are fully
>> supported.
>> >> But
>> >> > it seems that when it comes to control flows involving framework
>> space
>> >> > information, some of the information has not been exposed to the
>> >> pluggable,
>> >> > and some framework actions might also be missing.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jiangjie (Becket) Qin
>> >> >
>> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org>
>> wrote:
>> >> >
>> >> >> Hi folks,
>> >> >>
>> >> >> quick input from my side. I think this is from the implementation
>> >> >> perspective what Piotr and I had in mind for a global min watermark
>> >> that
>> >> >> helps in idleness cases. See also point 3 in
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >> >> .
>> >> >>
>> >> >> Basically, we would like to empower source enumerators to determine
>> the
>> >> >> global min watermark for all source readers factoring in even future
>> >> >> splits. Not all sources can supply that information (think of a
>> general
>> >> >> file source) but most should be able to. Basically, Flink should
>> know
>> >> for a
>> >> >> given source at a given point in time what the min watermark across
>> all
>> >> >> source subtasks is.
>> >> >>
>> >> >> Here is some background:
>> >> >> In the context of idleness, we can deterministically advance the
>> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> >> sources
>> >> >> to switch to idleness and thus allow watermarks to increase in cases
>> >> where
>> >> >> fewer splits than source tasks are available. However, for sources
>> with
>> >> >> dynamic split discovery that actually yields incorrect results.
>> Think
>> >> of a
>> >> >> Kinesis consumer where a shard is split. Then a previously idle
>> source
>> >> >> subtask may receive a new split with time t0 as the lowest
>> timestamp.
>> >> Since
>> >> >> the source subtask did not participate in the global watermark
>> >> generation
>> >> >> (because it was idle), the previously emitted watermark may be past
>> t0
>> >> and
>> >> >> thus results in late records potentially being discarded. A rerun of
>> >> the
>> >> >> same pipeline on historic data would not render the source subtask
>> >> idle and
>> >> >> not result in late records. The solution was to not render source
>> >> subtasks
>> >> >> automatically idle by the framework if there are no spits. That
>> leads
>> >> to
>> >> >> confusion for Kafka users with static topic subscription where
>> #splits
>> >> <
>> >> >> #parallelism stalls pipelines because the watermark is not
>> advancing.
>> >> Here,
>> >> >> your sketched solution can be transferred to KafkaSource to let
>> Flink
>> >> know
>> >> >> that min global watermark on a static assignment is determined by
>> the
>> >> >> slowest partition. Hence, all idle readers emit that min global
>> >> watermark
>> >> >> and the user sees progress.
>> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd
>> go
>> >> >> with another FLIP as the goal is quite different even though the
>> >> >> implementation overlaps.
>> >> >>
>> >> >> Now Iceberg seems to use the same information to actually pause the
>> >> >> consumption of files and create some kind of orderness guarantees as
>> >> far as
>> >> >> I understood. This probably can be applied to any source with
>> dynamic
>> >> split
>> >> >> discovery. However, I wouldn't mix up the concepts and hence I
>> >> appreciate
>> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>> >> FLIP-182 is
>> >> >> to pause readers while consuming a split, while your approach pauses
>> >> >> readers before processing another split. So it feels more closely
>> >> related
>> >> >> to the global min watermark - so it could either be part of that
>> FLIP
>> >> or a
>> >> >> FLIP of its own. Afaik API changes should actually happen only on
>> the
>> >> >> enumerator side both for your ideas and for global min watermark.
>> >> >>
>> >> >> Best,
>> >> >>
>> >> >> Arvid
>> >> >>
>> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org>
>> wrote:
>> >> >>
>> >> >>> Hi Steven,
>> >> >>>
>> >> >>> Would it be better to bring this as a separate thread related to
>> >> Iceberg
>> >> >>> source to the dev@ list? I think this could benefit from broader
>> >> input?
>> >> >>>
>> >> >>> Thanks
>> >> >>>
>> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> + Becket and Sebastian
>> >> >>>>
>> >> >>>> It is also related to the split level watermark alignment
>> discussion
>> >> >>>> thread. Because it is already very long, I don't want to further
>> >> complicate
>> >> >>>> the ongoing discussion there. But I can move the discussion to
>> that
>> >> >>>> existing thread if that is preferred.
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Hi all,
>> >> >>>>>
>> >> >>>>> We are thinking about how to align with the Flink community and
>> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source.
>> I
>> >> put some
>> >> >>>>> context in this google doc. Would love to get hear your thoughts
>> on
>> >> this.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >>
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >> >>>>>
>> >> >>>>> Thanks,
>> >> >>>>> Steven
>> >> >>>>>
>> >> >>>>
>> >>
>> >
>>
>

Re: Source alignment for Iceberg

Posted by Steven Wu <st...@gmail.com>.
Piotr, thanks a lot for your feedback.

> I can see this being an issue if the existence of too many blocked splits
is occupying too many resources.

This is not desirable. Eagerly assigning many splits to a reader can defeat
the benefits of pull based dynamic split assignments. Iceberg readers
request one split at a time upon start or completion of a split. Dynamic
split assignment is better for work sharing/stealing as Becket mentioned.
Limiting number of active splits can be handled by the FLIP-27 Iceberg
source and is somewhat orthogonal to watermark alignment.

> Can not Iceberg just emit all splits and let FLIP-182/FLIP-217 handle the
watermark alignment and block the splits that are too much into the future?

The enumerator just assigns the next split to the requesting reader instead
of holding back the split assignment. Let the reader handle the pause (if
the file split requires alignment wait).  This strategy might work and
leverage more from the framework.

We probably need the following to make this work
* extract watermark/timestamp only at the completion of a split (not at
record level). Because records in a file aren't probably not sorted by the
timestamp field, the pause or watermark advancement is probably better done
at file level.
* source readers checkpoint the watermark. otherwise, upon restart readers
won't be able to determine the local watermark and pause for alignment. We
don't want to emit records upon restart due to unknown watermark info.

All,

Any opinion on different timestamp for source alignment (vs Flink
application watermark)? For Iceberg source, we might want to enforce
alignment on kafka timestamp but Flink application watermark may use event
time field from payload.

Thanks,
Steven

On Wed, May 4, 2022 at 7:02 AM Becket Qin <be...@gmail.com> wrote:

> Hey Piotr,
>
> I think the mechanism FLIP-182 provided is a reasonable default one, which
> ensures the watermarks are only drifted by an upper bound. However,
> admittedly there are also other strategies for different purposes.
>
> In the Iceberg case, I am not sure if a static strictly allowed watermark
> drift is desired. The source might just want to finish reading the assigned
> splits as fast as possible. And it is OK to have a drift of "one split",
> instead of a fixed time period.
>
> As another example, if there are some fast readers whose splits are always
> throttled, while the other slow readers are struggling to keep up with the
> rest of the splits, the split enumerator may decide to reassign the slow
> splits so all the readers have something to read. This would need the
> SplitEnumerator to be aware of the watermark progress on each reader. So it
> seems useful to expose the WatermarkAlignmentEvent information to the
> SplitEnumerator as well.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org>
> wrote:
>
> > Hi Steven,
> >
> > Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
> > all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
> > block the splits that are too much into the future? I can see this being
> an
> > issue if the existence of too many blocked splits is occupying too many
> > resources.
> >
> > If that's the case, indeed SourceCoordinator/SplitEnumerator would have
> to
> > decide on some basis how many and which splits to assign in what order.
> But
> > in that case I'm not sure how much you could use from FLIP-182 and
> > FLIP-217. They seem somehow orthogonal to me, operating on different
> > levels. FLIP-182 and FLIP-217 are working with whatever splits have
> already
> > been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
> > take care of only the problem to limit the number of parallel active
> > splits. And here I'm not sure if it would be worth generalising a
> solution
> > across different connectors.
> >
> > Regarding the global watermark, I made a related comment sometime ago
> > about it [1]. It sounds to me like you also need to solve this problem,
> > otherwise Iceberg users will encounter late records in case of some race
> > conditions between assigning new splits and completions of older.
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
> >
> > pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
> >
> >> add dev@ group to the thread as Thomas suggested
> >>
> >> Arvid,
> >>
> >> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
> >> (idleness) can happen to Iceberg source alignment, as readers can be
> >> temporarily starved due to the holdback by the enumerator when assigning
> >> new splits upon request.
> >>
> >> Totally agree that we should decouple this discussion with the FLIP-217,
> >> which addresses the split level watermark alignment problem as a
> follow-up
> >> of FLIP-182
> >>
> >> Becket,
> >>
> >> Yes, currently Iceberg source implemented the alignment leveraging the
> >> dynamic split assignment from FLIP-27 design. Basically, the enumerator
> >> can
> >> hold back split assignments to readers when necessary. Everything are
> >> centralized in the enumerator: (1) watermark extraction and aggregation
> >> (2)
> >> alignment decision and execution
> >>
> >> The motivation of this discussion is to see if Iceberg source can
> leverage
> >> some of the watermark alignment solutions (like FLIP-182) from Flink
> >> framework. E.g., as mentioned in the doc, Iceberg source can potentially
> >> leverage the FLIP-182 framework to do the watermark extraction and
> >> aggregation. For the alignment decision and execution, we can keep them
> in
> >> the centralized enumerator.
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com>
> wrote:
> >>
> >> > Hi Steven,
> >> >
> >> > Thanks for pulling me into this thread. I think the timestamp
> >> > alignment use case here is a good example of what FLIP-27 was designed
> >> for.
> >> >
> >> > Technically speaking, Iceberg source can already implement the
> timestamp
> >> > alignment in the Flink new source even without FLIP-182. However, I
> >> > understand the rationale here because timestamp alignment is also
> >> trying to
> >> > orchestrate the consumption of splits. However, it looks like FLIP-182
> >> was
> >> > not designed in a way that it can be easily extended for other use
> >> cases.
> >> > It may probably worth thinking of a more general mechanism to answer
> the
> >> > following questions:
> >> >
> >> > 1. What information whose source of truth is the Flink framework
> should
> >> be
> >> > exposed to the SplitEnumerator and SourceReader? And how?
> >> > 2. What control actions in the Flink framework are worth exposing to
> the
> >> > SplitEnumerators and SourceReaders? And how?
> >> >
> >> > In the context of timestamp alignment, the first question is more
> >> > relevant. For example, instead of hardcode the ReportWatermarkEvent
> >> > handling logic in the SourceCoordinator, should we expose this to the
> >> > SplitEnumerator? So basically there will be some information, such as
> >> > subtask local watermark, whose source of truth is Flink runtime, but
> >> useful
> >> > to the user provided pluggables.
> >> >
> >> > I think there are a few control flow patterns to make a complete
> design:
> >> >
> >> > a. Framework space information (e.g. watermark) --> User space
> >> Pluggables
> >> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
> >> split).
> >> > b. Framework space information (e.g. task failure) --> User space
> >> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g.
> exit
> >> > the job)
> >> > c. User space information (e.g. a custom workload metric) --> User
> space
> >> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g.
> rebalance
> >> > the workload across the source readers).
> >> > d. Use space information (e.g. a custom stopping event in the stream)
> >> -->
> >> > User space pluggables (e.g. SplitEnumerator) --> Framework space
> actions
> >> > (e.g. stop the job).
> >> >
> >> > So basically for any user provided pluggables, the input information
> may
> >> > either come from another user provided logic or from the framework,
> and
> >> > after receiving that information, the pluggable may either want the
> >> > framework or another pluggable to take an action. So this gives the
> >> above 4
> >> > combinations.
> >> >
> >> > In our case, when the pluggables are SplitEnumerator and SourceReader,
> >> the
> >> > control flows that only involve user space actions are fully
> supported.
> >> But
> >> > it seems that when it comes to control flows involving framework space
> >> > information, some of the information has not been exposed to the
> >> pluggable,
> >> > and some framework actions might also be missing.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
> >> >
> >> >> Hi folks,
> >> >>
> >> >> quick input from my side. I think this is from the implementation
> >> >> perspective what Piotr and I had in mind for a global min watermark
> >> that
> >> >> helps in idleness cases. See also point 3 in
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >> >> .
> >> >>
> >> >> Basically, we would like to empower source enumerators to determine
> the
> >> >> global min watermark for all source readers factoring in even future
> >> >> splits. Not all sources can supply that information (think of a
> general
> >> >> file source) but most should be able to. Basically, Flink should know
> >> for a
> >> >> given source at a given point in time what the min watermark across
> all
> >> >> source subtasks is.
> >> >>
> >> >> Here is some background:
> >> >> In the context of idleness, we can deterministically advance the
> >> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> >> sources
> >> >> to switch to idleness and thus allow watermarks to increase in cases
> >> where
> >> >> fewer splits than source tasks are available. However, for sources
> with
> >> >> dynamic split discovery that actually yields incorrect results. Think
> >> of a
> >> >> Kinesis consumer where a shard is split. Then a previously idle
> source
> >> >> subtask may receive a new split with time t0 as the lowest timestamp.
> >> Since
> >> >> the source subtask did not participate in the global watermark
> >> generation
> >> >> (because it was idle), the previously emitted watermark may be past
> t0
> >> and
> >> >> thus results in late records potentially being discarded. A rerun of
> >> the
> >> >> same pipeline on historic data would not render the source subtask
> >> idle and
> >> >> not result in late records. The solution was to not render source
> >> subtasks
> >> >> automatically idle by the framework if there are no spits. That leads
> >> to
> >> >> confusion for Kafka users with static topic subscription where
> #splits
> >> <
> >> >> #parallelism stalls pipelines because the watermark is not advancing.
> >> Here,
> >> >> your sketched solution can be transferred to KafkaSource to let Flink
> >> know
> >> >> that min global watermark on a static assignment is determined by the
> >> >> slowest partition. Hence, all idle readers emit that min global
> >> watermark
> >> >> and the user sees progress.
> >> >> This whole idea is related to FLIP-182 watermark alignment but I'd go
> >> >> with another FLIP as the goal is quite different even though the
> >> >> implementation overlaps.
> >> >>
> >> >> Now Iceberg seems to use the same information to actually pause the
> >> >> consumption of files and create some kind of orderness guarantees as
> >> far as
> >> >> I understood. This probably can be applied to any source with dynamic
> >> split
> >> >> discovery. However, I wouldn't mix up the concepts and hence I
> >> appreciate
> >> >> you not chiming into the FLIP-182 and ff. threads. The goal of
> >> FLIP-182 is
> >> >> to pause readers while consuming a split, while your approach pauses
> >> >> readers before processing another split. So it feels more closely
> >> related
> >> >> to the global min watermark - so it could either be part of that FLIP
> >> or a
> >> >> FLIP of its own. Afaik API changes should actually happen only on the
> >> >> enumerator side both for your ideas and for global min watermark.
> >> >>
> >> >> Best,
> >> >>
> >> >> Arvid
> >> >>
> >> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org> wrote:
> >> >>
> >> >>> Hi Steven,
> >> >>>
> >> >>> Would it be better to bring this as a separate thread related to
> >> Iceberg
> >> >>> source to the dev@ list? I think this could benefit from broader
> >> input?
> >> >>>
> >> >>> Thanks
> >> >>>
> >> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
> >> wrote:
> >> >>>
> >> >>>> + Becket and Sebastian
> >> >>>>
> >> >>>> It is also related to the split level watermark alignment
> discussion
> >> >>>> thread. Because it is already very long, I don't want to further
> >> complicate
> >> >>>> the ongoing discussion there. But I can move the discussion to that
> >> >>>> existing thread if that is preferred.
> >> >>>>
> >> >>>>
> >> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> Hi all,
> >> >>>>>
> >> >>>>> We are thinking about how to align with the Flink community and
> >> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I
> >> put some
> >> >>>>> context in this google doc. Would love to get hear your thoughts
> on
> >> this.
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >> >>>>>
> >> >>>>> Thanks,
> >> >>>>> Steven
> >> >>>>>
> >> >>>>
> >>
> >
>

Re: Source alignment for Iceberg

Posted by Becket Qin <be...@gmail.com>.
Hey Piotr,

I think the mechanism FLIP-182 provided is a reasonable default one, which
ensures the watermarks are only drifted by an upper bound. However,
admittedly there are also other strategies for different purposes.

In the Iceberg case, I am not sure if a static strictly allowed watermark
drift is desired. The source might just want to finish reading the assigned
splits as fast as possible. And it is OK to have a drift of "one split",
instead of a fixed time period.

As another example, if there are some fast readers whose splits are always
throttled, while the other slow readers are struggling to keep up with the
rest of the splits, the split enumerator may decide to reassign the slow
splits so all the readers have something to read. This would need the
SplitEnumerator to be aware of the watermark progress on each reader. So it
seems useful to expose the WatermarkAlignmentEvent information to the
SplitEnumerator as well.

Thanks,

Jiangjie (Becket) Qin



On Tue, May 3, 2022 at 7:58 PM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Steven,
>
> Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
> all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
> block the splits that are too much into the future? I can see this being an
> issue if the existence of too many blocked splits is occupying too many
> resources.
>
> If that's the case, indeed SourceCoordinator/SplitEnumerator would have to
> decide on some basis how many and which splits to assign in what order. But
> in that case I'm not sure how much you could use from FLIP-182 and
> FLIP-217. They seem somehow orthogonal to me, operating on different
> levels. FLIP-182 and FLIP-217 are working with whatever splits have already
> been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
> take care of only the problem to limit the number of parallel active
> splits. And here I'm not sure if it would be worth generalising a solution
> across different connectors.
>
> Regarding the global watermark, I made a related comment sometime ago
> about it [1]. It sounds to me like you also need to solve this problem,
> otherwise Iceberg users will encounter late records in case of some race
> conditions between assigning new splits and completions of older.
>
> Best,
> Piotrek
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545
>
> pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):
>
>> add dev@ group to the thread as Thomas suggested
>>
>> Arvid,
>>
>> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
>> (idleness) can happen to Iceberg source alignment, as readers can be
>> temporarily starved due to the holdback by the enumerator when assigning
>> new splits upon request.
>>
>> Totally agree that we should decouple this discussion with the FLIP-217,
>> which addresses the split level watermark alignment problem as a follow-up
>> of FLIP-182
>>
>> Becket,
>>
>> Yes, currently Iceberg source implemented the alignment leveraging the
>> dynamic split assignment from FLIP-27 design. Basically, the enumerator
>> can
>> hold back split assignments to readers when necessary. Everything are
>> centralized in the enumerator: (1) watermark extraction and aggregation
>> (2)
>> alignment decision and execution
>>
>> The motivation of this discussion is to see if Iceberg source can leverage
>> some of the watermark alignment solutions (like FLIP-182) from Flink
>> framework. E.g., as mentioned in the doc, Iceberg source can potentially
>> leverage the FLIP-182 framework to do the watermark extraction and
>> aggregation. For the alignment decision and execution, we can keep them in
>> the centralized enumerator.
>>
>> Thanks,
>> Steven
>>
>> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com> wrote:
>>
>> > Hi Steven,
>> >
>> > Thanks for pulling me into this thread. I think the timestamp
>> > alignment use case here is a good example of what FLIP-27 was designed
>> for.
>> >
>> > Technically speaking, Iceberg source can already implement the timestamp
>> > alignment in the Flink new source even without FLIP-182. However, I
>> > understand the rationale here because timestamp alignment is also
>> trying to
>> > orchestrate the consumption of splits. However, it looks like FLIP-182
>> was
>> > not designed in a way that it can be easily extended for other use
>> cases.
>> > It may probably worth thinking of a more general mechanism to answer the
>> > following questions:
>> >
>> > 1. What information whose source of truth is the Flink framework should
>> be
>> > exposed to the SplitEnumerator and SourceReader? And how?
>> > 2. What control actions in the Flink framework are worth exposing to the
>> > SplitEnumerators and SourceReaders? And how?
>> >
>> > In the context of timestamp alignment, the first question is more
>> > relevant. For example, instead of hardcode the ReportWatermarkEvent
>> > handling logic in the SourceCoordinator, should we expose this to the
>> > SplitEnumerator? So basically there will be some information, such as
>> > subtask local watermark, whose source of truth is Flink runtime, but
>> useful
>> > to the user provided pluggables.
>> >
>> > I think there are a few control flow patterns to make a complete design:
>> >
>> > a. Framework space information (e.g. watermark) --> User space
>> Pluggables
>> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
>> split).
>> > b. Framework space information (e.g. task failure) --> User space
>> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
>> > the job)
>> > c. User space information (e.g. a custom workload metric) --> User space
>> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
>> > the workload across the source readers).
>> > d. Use space information (e.g. a custom stopping event in the stream)
>> -->
>> > User space pluggables (e.g. SplitEnumerator) --> Framework space actions
>> > (e.g. stop the job).
>> >
>> > So basically for any user provided pluggables, the input information may
>> > either come from another user provided logic or from the framework, and
>> > after receiving that information, the pluggable may either want the
>> > framework or another pluggable to take an action. So this gives the
>> above 4
>> > combinations.
>> >
>> > In our case, when the pluggables are SplitEnumerator and SourceReader,
>> the
>> > control flows that only involve user space actions are fully supported.
>> But
>> > it seems that when it comes to control flows involving framework space
>> > information, some of the information has not been exposed to the
>> pluggable,
>> > and some framework actions might also be missing.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
>> >
>> >> Hi folks,
>> >>
>> >> quick input from my side. I think this is from the implementation
>> >> perspective what Piotr and I had in mind for a global min watermark
>> that
>> >> helps in idleness cases. See also point 3 in
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> >> .
>> >>
>> >> Basically, we would like to empower source enumerators to determine the
>> >> global min watermark for all source readers factoring in even future
>> >> splits. Not all sources can supply that information (think of a general
>> >> file source) but most should be able to. Basically, Flink should know
>> for a
>> >> given source at a given point in time what the min watermark across all
>> >> source subtasks is.
>> >>
>> >> Here is some background:
>> >> In the context of idleness, we can deterministically advance the
>> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
>> sources
>> >> to switch to idleness and thus allow watermarks to increase in cases
>> where
>> >> fewer splits than source tasks are available. However, for sources with
>> >> dynamic split discovery that actually yields incorrect results. Think
>> of a
>> >> Kinesis consumer where a shard is split. Then a previously idle source
>> >> subtask may receive a new split with time t0 as the lowest timestamp.
>> Since
>> >> the source subtask did not participate in the global watermark
>> generation
>> >> (because it was idle), the previously emitted watermark may be past t0
>> and
>> >> thus results in late records potentially being discarded. A rerun of
>> the
>> >> same pipeline on historic data would not render the source subtask
>> idle and
>> >> not result in late records. The solution was to not render source
>> subtasks
>> >> automatically idle by the framework if there are no spits. That leads
>> to
>> >> confusion for Kafka users with static topic subscription where #splits
>> <
>> >> #parallelism stalls pipelines because the watermark is not advancing.
>> Here,
>> >> your sketched solution can be transferred to KafkaSource to let Flink
>> know
>> >> that min global watermark on a static assignment is determined by the
>> >> slowest partition. Hence, all idle readers emit that min global
>> watermark
>> >> and the user sees progress.
>> >> This whole idea is related to FLIP-182 watermark alignment but I'd go
>> >> with another FLIP as the goal is quite different even though the
>> >> implementation overlaps.
>> >>
>> >> Now Iceberg seems to use the same information to actually pause the
>> >> consumption of files and create some kind of orderness guarantees as
>> far as
>> >> I understood. This probably can be applied to any source with dynamic
>> split
>> >> discovery. However, I wouldn't mix up the concepts and hence I
>> appreciate
>> >> you not chiming into the FLIP-182 and ff. threads. The goal of
>> FLIP-182 is
>> >> to pause readers while consuming a split, while your approach pauses
>> >> readers before processing another split. So it feels more closely
>> related
>> >> to the global min watermark - so it could either be part of that FLIP
>> or a
>> >> FLIP of its own. Afaik API changes should actually happen only on the
>> >> enumerator side both for your ideas and for global min watermark.
>> >>
>> >> Best,
>> >>
>> >> Arvid
>> >>
>> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org> wrote:
>> >>
>> >>> Hi Steven,
>> >>>
>> >>> Would it be better to bring this as a separate thread related to
>> Iceberg
>> >>> source to the dev@ list? I think this could benefit from broader
>> input?
>> >>>
>> >>> Thanks
>> >>>
>> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
>> wrote:
>> >>>
>> >>>> + Becket and Sebastian
>> >>>>
>> >>>> It is also related to the split level watermark alignment discussion
>> >>>> thread. Because it is already very long, I don't want to further
>> complicate
>> >>>> the ongoing discussion there. But I can move the discussion to that
>> >>>> existing thread if that is preferred.
>> >>>>
>> >>>>
>> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi all,
>> >>>>>
>> >>>>> We are thinking about how to align with the Flink community and
>> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I
>> put some
>> >>>>> context in this google doc. Would love to get hear your thoughts on
>> this.
>> >>>>>
>> >>>>>
>> >>>>>
>> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Steven
>> >>>>>
>> >>>>
>>
>

Re: Source alignment for Iceberg

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Steven,

Isn't this redundant to FLIP-182 and FLIP-217? Can not Iceberg just emit
all splits and let FLIP-182/FLIP-217 handle the watermark alignment and
block the splits that are too much into the future? I can see this being an
issue if the existence of too many blocked splits is occupying too many
resources.

If that's the case, indeed SourceCoordinator/SplitEnumerator would have to
decide on some basis how many and which splits to assign in what order. But
in that case I'm not sure how much you could use from FLIP-182 and
FLIP-217. They seem somehow orthogonal to me, operating on different
levels. FLIP-182 and FLIP-217 are working with whatever splits have already
been generated and assigned. You could leverage FLIP-182 and FLIP-217 and
take care of only the problem to limit the number of parallel active
splits. And here I'm not sure if it would be worth generalising a solution
across different connectors.

Regarding the global watermark, I made a related comment sometime ago about
it [1]. It sounds to me like you also need to solve this problem, otherwise
Iceberg users will encounter late records in case of some race conditions
between assigning new splits and completions of older.

Best,
Piotrek

[1]
https://issues.apache.org/jira/browse/FLINK-21871?focusedCommentId=17495545&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17495545

pon., 2 maj 2022 o 04:26 Steven Wu <st...@gmail.com> napisał(a):

> add dev@ group to the thread as Thomas suggested
>
> Arvid,
>
> The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
> (idleness) can happen to Iceberg source alignment, as readers can be
> temporarily starved due to the holdback by the enumerator when assigning
> new splits upon request.
>
> Totally agree that we should decouple this discussion with the FLIP-217,
> which addresses the split level watermark alignment problem as a follow-up
> of FLIP-182
>
> Becket,
>
> Yes, currently Iceberg source implemented the alignment leveraging the
> dynamic split assignment from FLIP-27 design. Basically, the enumerator can
> hold back split assignments to readers when necessary. Everything are
> centralized in the enumerator: (1) watermark extraction and aggregation (2)
> alignment decision and execution
>
> The motivation of this discussion is to see if Iceberg source can leverage
> some of the watermark alignment solutions (like FLIP-182) from Flink
> framework. E.g., as mentioned in the doc, Iceberg source can potentially
> leverage the FLIP-182 framework to do the watermark extraction and
> aggregation. For the alignment decision and execution, we can keep them in
> the centralized enumerator.
>
> Thanks,
> Steven
>
> On Thu, Apr 28, 2022 at 2:05 AM Becket Qin <be...@gmail.com> wrote:
>
> > Hi Steven,
> >
> > Thanks for pulling me into this thread. I think the timestamp
> > alignment use case here is a good example of what FLIP-27 was designed
> for.
> >
> > Technically speaking, Iceberg source can already implement the timestamp
> > alignment in the Flink new source even without FLIP-182. However, I
> > understand the rationale here because timestamp alignment is also trying
> to
> > orchestrate the consumption of splits. However, it looks like FLIP-182
> was
> > not designed in a way that it can be easily extended for other use cases.
> > It may probably worth thinking of a more general mechanism to answer the
> > following questions:
> >
> > 1. What information whose source of truth is the Flink framework should
> be
> > exposed to the SplitEnumerator and SourceReader? And how?
> > 2. What control actions in the Flink framework are worth exposing to the
> > SplitEnumerators and SourceReaders? And how?
> >
> > In the context of timestamp alignment, the first question is more
> > relevant. For example, instead of hardcode the ReportWatermarkEvent
> > handling logic in the SourceCoordinator, should we expose this to the
> > SplitEnumerator? So basically there will be some information, such as
> > subtask local watermark, whose source of truth is Flink runtime, but
> useful
> > to the user provided pluggables.
> >
> > I think there are a few control flow patterns to make a complete design:
> >
> > a. Framework space information (e.g. watermark) --> User space Pluggables
> > (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a
> split).
> > b. Framework space information (e.g. task failure) --> User space
> > pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
> > the job)
> > c. User space information (e.g. a custom workload metric) --> User space
> > pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
> > the workload across the source readers).
> > d. Use space information (e.g. a custom stopping event in the stream) -->
> > User space pluggables (e.g. SplitEnumerator) --> Framework space actions
> > (e.g. stop the job).
> >
> > So basically for any user provided pluggables, the input information may
> > either come from another user provided logic or from the framework, and
> > after receiving that information, the pluggable may either want the
> > framework or another pluggable to take an action. So this gives the
> above 4
> > combinations.
> >
> > In our case, when the pluggables are SplitEnumerator and SourceReader,
> the
> > control flows that only involve user space actions are fully supported.
> But
> > it seems that when it comes to control flows involving framework space
> > information, some of the information has not been exposed to the
> pluggable,
> > and some framework actions might also be missing.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise <ar...@apache.org> wrote:
> >
> >> Hi folks,
> >>
> >> quick input from my side. I think this is from the implementation
> >> perspective what Piotr and I had in mind for a global min watermark that
> >> helps in idleness cases. See also point 3 in
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
> >> .
> >>
> >> Basically, we would like to empower source enumerators to determine the
> >> global min watermark for all source readers factoring in even future
> >> splits. Not all sources can supply that information (think of a general
> >> file source) but most should be able to. Basically, Flink should know
> for a
> >> given source at a given point in time what the min watermark across all
> >> source subtasks is.
> >>
> >> Here is some background:
> >> In the context of idleness, we can deterministically advance the
> >> watermark. In the pre-FLIP-27 era, we had heuristic approaches in
> sources
> >> to switch to idleness and thus allow watermarks to increase in cases
> where
> >> fewer splits than source tasks are available. However, for sources with
> >> dynamic split discovery that actually yields incorrect results. Think
> of a
> >> Kinesis consumer where a shard is split. Then a previously idle source
> >> subtask may receive a new split with time t0 as the lowest timestamp.
> Since
> >> the source subtask did not participate in the global watermark
> generation
> >> (because it was idle), the previously emitted watermark may be past t0
> and
> >> thus results in late records potentially being discarded. A rerun of the
> >> same pipeline on historic data would not render the source subtask idle
> and
> >> not result in late records. The solution was to not render source
> subtasks
> >> automatically idle by the framework if there are no spits. That leads to
> >> confusion for Kafka users with static topic subscription where #splits <
> >> #parallelism stalls pipelines because the watermark is not advancing.
> Here,
> >> your sketched solution can be transferred to KafkaSource to let Flink
> know
> >> that min global watermark on a static assignment is determined by the
> >> slowest partition. Hence, all idle readers emit that min global
> watermark
> >> and the user sees progress.
> >> This whole idea is related to FLIP-182 watermark alignment but I'd go
> >> with another FLIP as the goal is quite different even though the
> >> implementation overlaps.
> >>
> >> Now Iceberg seems to use the same information to actually pause the
> >> consumption of files and create some kind of orderness guarantees as
> far as
> >> I understood. This probably can be applied to any source with dynamic
> split
> >> discovery. However, I wouldn't mix up the concepts and hence I
> appreciate
> >> you not chiming into the FLIP-182 and ff. threads. The goal of FLIP-182
> is
> >> to pause readers while consuming a split, while your approach pauses
> >> readers before processing another split. So it feels more closely
> related
> >> to the global min watermark - so it could either be part of that FLIP
> or a
> >> FLIP of its own. Afaik API changes should actually happen only on the
> >> enumerator side both for your ideas and for global min watermark.
> >>
> >> Best,
> >>
> >> Arvid
> >>
> >> On Wed, Apr 27, 2022 at 7:31 PM Thomas Weise <th...@apache.org> wrote:
> >>
> >>> Hi Steven,
> >>>
> >>> Would it be better to bring this as a separate thread related to
> Iceberg
> >>> source to the dev@ list? I think this could benefit from broader
> input?
> >>>
> >>> Thanks
> >>>
> >>> On Wed, Apr 27, 2022 at 9:36 AM Steven Wu <st...@gmail.com>
> wrote:
> >>>
> >>>> + Becket and Sebastian
> >>>>
> >>>> It is also related to the split level watermark alignment discussion
> >>>> thread. Because it is already very long, I don't want to further
> complicate
> >>>> the ongoing discussion there. But I can move the discussion to that
> >>>> existing thread if that is preferred.
> >>>>
> >>>>
> >>>> On Tue, Apr 26, 2022 at 10:03 PM Steven Wu <st...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> We are thinking about how to align with the Flink community and
> >>>>> leverage the FLIP-182 watermark alignment in the Iceberg source. I
> put some
> >>>>> context in this google doc. Would love to get hear your thoughts on
> this.
> >>>>>
> >>>>>
> >>>>>
> https://docs.google.com/document/d/1zfwF8e5LszazcOzmUAOeOtpM9v8dKEPlY_BRFSmI3us/edit#
> >>>>>
> >>>>> Thanks,
> >>>>> Steven
> >>>>>
> >>>>
>