You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Maximilian Michels <mx...@apache.org> on 2022/11/01 12:28:55 UTC

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
The first version did not lay out how the refactoring would be used down
the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a
non-public API and before reading the code, I wasn't even aware how exactly
it worked and whether it would be available to regular operators (it was
originally intended for sources only).

I might seem pedantic here but I believe the purpose of a FLIP should be to
describe the *why* behind the changes, not only the changes itself. A FLIP
is not a formality but is a tool to communicate and discuss changes. I
think we still haven't laid out the exact reasons why we are factoring out
the base. As far as I understand now, we need the base class to deal with
concurrent updates in the custom Coordinator from the runtime (sub)tasks.
Effectively, we are enforcing an actor model for the processing of the
incoming messages such that the OperatorCoordinator can cleanly update its
state. However, if there are no actual implementations that make use of the
refactoring in Flink itself, I wonder if it would make sense to copy this
code to the downstream implementation, e.g. the ShuffleCoordinator. As soon
as it is part of Flink, we could of course try to consolidate this code.

Considering the *how* of this, there appear to be both methods from
SourceCoordinator (e.g. runInEventLoop) as well as SourceCoordinatorContext
listed in the FLIP, as well as methods which do not appear anywhere in
Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
appears that some of this has been extracted from a downstream
implementation. It would be great to adjust this, such that it reflects the
status quo in Flink.

-Max

On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com> wrote:

> Max,
>
> Thanks a lot for the comments. We should clarify that the shuffle
> operator/coordinator is not really part of the Flink sink
> function/operator. shuffle operator is a custom operator that can be
> inserted right before the Iceberg writer operator. Shuffle operator
> calculates the traffic statistics and performs a custom partition/shuffle
> (DataStream#partitionCustom) to cluster the data right before they get to
> the Iceberg writer operator.
>
> We are not proposing to introduce a sink coordinator for the sink
> interface. Shuffle operator needs the CoordinatorContextBase to
> facilitate the communication btw shuffle subtasks and coordinator for
> traffic statistics aggregation. The communication part is already
> implemented by SourceCoordinatorContext.
>
> Here are some details about the communication needs.
> - subtasks periodically calculate local statistics and send to the
> coordinator for global aggregation
> - the coordinator sends the globally aggregated statistics to the subtasks
> - subtasks use the globally aggregated statistics to guide the
> partition/shuffle decision
>
> Regards,
> Steven
>
> On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org> wrote:
>
> > Hi Gang,
> >
> > Looks much better! I've actually gone through the OperatorCoordinator
> code.
> > It turns out, any operator already has an OperatorCoordinator assigned.
> > Also, any operator can add custom coordinator code. So it looks like you
> > won't have to implement any additional runtime logic to add a
> > ShuffleCoordinator. However, I'm wondering, why do you specifically need
> to
> > refactor the SourceCoordinatorContext? You could simply add your own
> > coordinator code. I'm not sure the sink requirements map to the source
> > interface so closely that you can reuse the same logic.
> >
> > If you can refactor SourceCoordinatorContext in a way that makes it fit
> > your use case, I have nothing to object here. By the way, another example
> > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
> which
> > is quite trivial but it might be worth evaluating whether you need the
> full
> > power of SourceCoordinatorContext which is why I wanted to get more
> > context.
> >
> > -Max
> >
> > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com> wrote:
> >
> > > Hi Max,
> > > I got your concern. Since shuffling support for Flink Iceberg sink is
> not
> > > the main body of the proposal, I add another appendix part just now
> with
> > > more details about how to use CoordinatorContextBase and how to define
> > > ShufflingCoordinator.
> > >
> > > Let me know if that cannot solve your concern.
> > >
> > > Thanks
> > > Gang
> > >
> > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <mx...@apache.org>
> > wrote:
> > >
> > >> Hey Gang,
> > >>
> > >> What I'm looking for here is a complete picture of why the change is
> > >> necessary and what the next steps are. Ultimately, refactoring any
> code
> > >> serves a purpose. Here, we want to refactor the Coordinator code such
> > that
> > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The
> FLIP
> > >> should address the next steps, i.e. how you plan to add the
> > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have to
> be
> > in
> > >> great detail but without this information, I don't think the FLIP is
> > >> complete.
> > >>
> > >> This feature should be generic enough to be usable by other sinks than
> > >> the Iceberg sink. Of course Iceberg can still load its own
> > implementation
> > >> which may be outlined in a separate FLIP.
> > >>
> > >> Unless there is a good reason, normal operators should not support the
> > >> coordinator functionality. At least I'm not convinced it would play
> well
> > >> with Flink's execution model. But I see how it is required for sources
> > and
> > >> sinks.
> > >>
> > >> -Max
> > >>
> > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com> wrote:
> > >>
> > >>> Hi Max,
> > >>>
> > >>> Thanks for reviewing.
> > >>>
> > >>> For this Flip 264, yes, we will only focus on abstracting RPC calls
> > >>> between the task and the job manager for communications and won't
> touch
> > >>> watermark checkpoint.
> > >>> If the coordinator doesn't need RPC calls to talk with subtasks, then
> > it
> > >>> can define context without extending from the
> CoordinatorContextBase(or
> > >>> find another class name to limit the scope).
> > >>>
> > >>> Regarding the code-changing scope, for this Flip 264, we will only do
> > >>> context extraction. The shuffling coordinator and operator
> > >>> <
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > >
> > >>> which will use the context will come with a separate proposal, thus
> we
> > try
> > >>> to keep it simple in Flip 264 to understand. I can add a little bit
> > more
> > >>> about how to use the coordinator context in Flip 264 if you think
> that
> > will
> > >>> be helpful.
> > >>>
> > >>> Thanks!
> > >>> Gang
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <mx...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger
> > >>>> change. The coordinator for sources, as part of FLIP-27, was
> > specifically
> > >>>> added to synchronize the global watermark and to assign splits
> > dynamically.
> > >>>> However, it practically allows arbitrary RPC calls between the task
> > and the
> > >>>> job manager. I understand that there is concern that such a powerful
> > >>>> mechanism should not be available to all operators. Nevertheless, I
> > see the
> > >>>> practical use in case of sinks like Iceberg. So I'd suggest limiting
> > this
> > >>>> feature to sinks (and sources) only.
> > >>>>
> > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
> > >>>> enough to achieve what you want. There will be additional work
> > necessary,
> > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator which
> > handles
> > >>>> the RPC calls and the checkpointing. I think it would be good to
> > outline
> > >>>> this in the FLIP.
> > >>>>
> > >>>> -Max
> > >>>>
> > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <st...@gmail.com>
> > wrote:
> > >>>>
> > >>>>> sorry. sent the incomplete reply by mistake.
> > >>>>>
> > >>>>> If there are any concrete concerns, we can discuss. In the
> > FLINK-27405
> > >>>>> [1],
> > >>>>> Avid pointed out some implications regarding checkpointing. In this
> > >>>>> small
> > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
> mainly
> > >>>>> need
> > >>>>> the coordinator context functionality to facilitate the
> communication
> > >>>>> between coordinator and subtasks.
> > >>>>>
> > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> > >>>>>
> > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <st...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>> > Hang, appreciate your input. Agree that `CoordinatorContextBase`
> > is a
> > >>>>> > better name considering Flink code convention.
> > >>>>> >
> > >>>>> > If there are any concrete concerns, we can discuss. In the jira,
> > >>>>> >
> > >>>>> >
> > >>>>> >
> > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> ruanhang1993@gmail.com
> > >
> > >>>>> wrote:
> > >>>>> >
> > >>>>> >> Hi,
> > >>>>> >>
> > >>>>> >> IMP, I agree to extract a base class for
> SourceCoordinatorContext.
> > >>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase` or
> > >>>>> >> `CoordinatorContextBase` as the format like `SourceReaderBase`.
> > >>>>> >> I also agree to what Piotr said. Maybe more problems will occur
> > when
> > >>>>> >> connectors start to use it.
> > >>>>> >>
> > >>>>> >> Best,
> > >>>>> >> Hang
> > >>>>> >>
> > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
> > >>>>> >>
> > >>>>> >> > Piotr,
> > >>>>> >> >
> > >>>>> >> > The proposal is to extract the listed methods from @Iinternal
> > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> > >>>>> BaseCoordinatorContext.
> > >>>>> >> >
> > >>>>> >> > The motivation is that other operators can leverage the
> > >>>>> communication
> > >>>>> >> > mechanism btw operator coordinator and operator subtasks. For
> > >>>>> example,
> > >>>>> >> in
> > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg sink)
> > can
> > >>>>> >> leverage
> > >>>>> >> > it for computing traffic distribution statistics.
> > >>>>> >> > * subtasks calculate local statistics and periodically send
> them
> > >>>>> to the
> > >>>>> >> > coordinator for global aggregation.
> > >>>>> >> > * The coordinator can broadcast the globally aggregated
> > >>>>> statistics to
> > >>>>> >> > subtasks, which can be used to guide the shuffling decision
> > >>>>> (selecting
> > >>>>> >> > downstream channels).
> > >>>>> >> >
> > >>>>> >> > Thanks,
> > >>>>> >> > Steven
> > >>>>> >> >
> > >>>>> >> >
> > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> > >>>>> pnowojski@apache.org>
> > >>>>> >> > wrote:
> > >>>>> >> >
> > >>>>> >> > > Hi,
> > >>>>> >> > >
> > >>>>> >> > > Could you clarify what's the proposal that you have in mind?
> > >>>>> From the
> > >>>>> >> > > context I would understand that the newly extracted
> > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> > >>>>> `@PublicEvolving`
> > >>>>> >> or
> > >>>>> >> > > `@Experimental`, since otherwise extracting it and keeping
> > >>>>> `@Internal`
> > >>>>> >> > > wouldn't change much? Such `@Internal` base class could have
> > >>>>> been
> > >>>>> >> removed
> > >>>>> >> > > at any point of time in the future. Having said that, it
> > sounds
> > >>>>> to me
> > >>>>> >> > like
> > >>>>> >> > > your proposal is a bit bigger than it looks at the first
> > glance
> > >>>>> and
> > >>>>> >> you
> > >>>>> >> > > actually want to expose the operator coordinator concept to
> > the
> > >>>>> public
> > >>>>> >> > API?
> > >>>>> >> > >
> > >>>>> >> > > AFAIK there were some discussions about that, and it was a
> bit
> > >>>>> of a
> > >>>>> >> > > conscious decision to NOT do that. I don't know those
> reasons
> > >>>>> however.
> > >>>>> >> > Only
> > >>>>> >> > > now, I've just heard that there are for example some
> problems
> > >>>>> with
> > >>>>> >> > > checkpointing of hypothetical non source operator
> > coordinators.
> > >>>>> Maybe
> > >>>>> >> > > someone else could shed some light on this?
> > >>>>> >> > >
> > >>>>> >> > > Conceptually I would be actually in favour of exposing
> > operator
> > >>>>> >> > > coordinators if there is a good reason behind that, but it
> is
> > a
> > >>>>> more
> > >>>>> >> > > difficult topic and might be a larger effort than it seems
> at
> > >>>>> the
> > >>>>> >> first
> > >>>>> >> > > glance.
> > >>>>> >> > >
> > >>>>> >> > > Best,
> > >>>>> >> > > Piotrek
> > >>>>> >> > >
> > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <st...@gmail.com>
> > >>>>> napisał(a):
> > >>>>> >> > >
> > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc
> is
> > >>>>> not for
> > >>>>> >> > this
> > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
> linked
> > >>>>> google
> > >>>>> >> doc
> > >>>>> >> > > is
> > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
> sink,
> > >>>>> which
> > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> coordinator
> > >>>>> can
> > >>>>> >> > leverage
> > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
> > >>>>> duplication.
> > >>>>> >> > > >
> > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> jing@ververica.com>
> > >>>>> wrote:
> > >>>>> >> > > >
> > >>>>> >> > > > > Thanks for bringing this up. It looks overall good! One
> > >>>>> small
> > >>>>> >> thing,
> > >>>>> >> > > you
> > >>>>> >> > > > > might want to write all content on the wiki page instead
> > of
> > >>>>> >> linking
> > >>>>> >> > to
> > >>>>> >> > > a
> > >>>>> >> > > > > google doc. The reason is that some people might not be
> > >>>>> able to
> > >>>>> >> > access
> > >>>>> >> > > > the
> > >>>>> >> > > > > google doc.
> > >>>>> >> > > > >
> > >>>>> >> > > > > Best regards,
> > >>>>> >> > > > > Jing
> > >>>>> >> > > > >
> > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> > yegangapp@gmail.com
> > >>>>> >
> > >>>>> >> wrote:
> > >>>>> >> > > > >
> > >>>>> >> > > > >> Hi,
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> We submit the Flip proposal
> > >>>>> >> > > > >> <
> > >>>>> >> > > > >>
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> > >>>>> >> > > > >> >
> > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
> > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> > >>>>> coordinators E.g.
> > >>>>> >> in
> > >>>>> >> > > the
> > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> > >>>>> >> > > > >> <
> > >>>>> >> > > > >>
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>>
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > >>>>> >> > > > >> >
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> Could you help to take a look?
> > >>>>> >> > > > >> Thanks
> > >>>>> >> > > > >>
> > >>>>> >> > > > >> Gang
> > >>>>> >> > > > >>
> > >>>>> >> > > > >
> > >>>>> >> > > >
> > >>>>> >> > >
> > >>>>> >> >
> > >>>>> >>
> > >>>>> >
> > >>>>>
> > >>>>
> >
>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Steven Wu <st...@gmail.com>.
Let me start an initial discussion thread at dev@flink. Like to gauge the
interests from the community (including Hudi and Delta Lake) first before
spending time on writing up a big FLIP.

On Fri, Jan 27, 2023 at 10:45 PM Jark Wu <im...@gmail.com> wrote:

> Thank Steven for the explanation.
>
> It sounds good to me to implement the shuffle operator in the Iceberg
> project first.
> We can contribute it to Flink DataStream in the future if other
> projects/connectors also need it.
>
> Best,
> Jark
>
>
> On Wed, 18 Jan 2023 at 02:11, Steven Wu <st...@gmail.com> wrote:
>
>> Jark,
>>
>> We were planning to discard the proposal due to some valid concerns
>> raised in the thread. Also, this proposal itself didn't really save too
>> much code duplication (maybe 100 lines or so).
>>
>> I also thought that the shuffle operator for DataStream can be useful for
>> other connectors too. The shuffling part (based on traffic statistics) can
>> be generic for other connectors. There will be some small integration part
>> unique to Iceberg, which can stay in Iceberg. If we go with this new
>> direction, we would need a new FLIP.
>>
>> Thanks,
>> Steven
>>
>>
>>
>> On Mon, Jan 16, 2023 at 12:30 AM Jark Wu <im...@gmail.com> wrote:
>>
>>> What's the status and conclusion of this discussion?
>>>
>>> I have seen the value of exposing OperatorCoordinator because of the
>>> powerful RPC calls,
>>> some projects are already using it, such as Hudi[1]. But I agree this is
>>> a large topic and
>>> requires another FLIP.
>>>
>>> I am also concerned about extracting a Public base class without
>>> implementations, and
>>> clear usage is easy to break in the future. However, I think the
>>> shuffling operator can be a
>>> generic component used by other connectors and DataStream jobs.
>>>
>>> Have you considered contributing the ShuffleOperator to the Flink main
>>> repository as a
>>> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
>>> extract the common
>>> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
>>> single repository
>>>  as an internal implementation.
>>>
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
>>>
>>> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>>> Ohhh, I was confused. I thought that the proposal is to make
>>>> `CoordinatorContextBase` part of the public API.
>>>>
>>>> However, I'm also against extracting `CoordinatorContextBase` as an
>>>> `@Internal` class as well.
>>>>
>>>> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
>>>> CoordinatedOperatorFactory would be already quite bad, but at least
>>>> this is
>>>> a relatively stable internal API. Using `@Internal`
>>>> `@CoordinatorContextBase`, and refactoring out this base class just for
>>>> the
>>>> sake of re-using it in a connector is IMO even worse.
>>>> 2. Double so if they are in a separate repository (as the iceberg
>>>> connector
>>>> will be/is, right?). There would be no way to prevent breaking changes
>>>> between repositories.
>>>>
>>>> If that's only intended as the stop-gap solution until we properly
>>>> expose
>>>> coordinators, the lesser evil would be IMO to copy/paste/modify
>>>> SourceCoordinatorContext to the flink-connector-iceberg repository.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> czw., 3 lis 2022 o 12:51 Maximilian Michels <mx...@apache.org>
>>>> napisał(a):
>>>>
>>>> > +1 If we wanted to expose the OperatorCoordinator API, we should
>>>> provide
>>>> > an adequate interface. The FLIP partially addresses this by trying to
>>>> > factor out RPC code which other coordinators might make use of, but
>>>> there
>>>> > is additional design necessary to realize a public operator API.
>>>> >
>>>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
>>>> > think they make sense in the context of an Iceberg ShuffleCoordinator
>>>> in
>>>> > Flink. If we were to add such a new coordinator, feel free to make the
>>>> > proposed code refactoring as part of a pull request. A FLIP isn't
>>>> strictly
>>>> > necessary here because this is a purely internal change which does not
>>>> > alter public APIs, nor does it alter the internal architecture, apart
>>>> from
>>>> > reusing a bit of existing code. I'm sorry if we consumed some of your
>>>> time
>>>> > revising the document but I think we had a healthy discussion here.
>>>> And
>>>> > we're definitely looking forward to seeing some of these code changes!
>>>> >
>>>> > -Max
>>>> >
>>>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org>
>>>> > wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> Sorry for the delay, but I've given more thoughts into this. First I
>>>> >> share the same thoughts as Maximilian, that this FLIP is incomplete.
>>>> As I
>>>> >> understand it, you are trying to hack existing code to expose small
>>>> bits of
>>>> >> internal functionalities as part of the public API without solving
>>>> many of
>>>> >> the underlying issues.
>>>> >>
>>>> >> For example, what's the point of exposing `CoordinatorContextBase`
>>>> as a
>>>> >> public API if users can not use it? After all, the
>>>> `OperatorCoordinator`
>>>> >> and `CoordinatedOperatorFactory` would remain internal. At the same
>>>> time,
>>>> >> this FLIP would officially force us to support and maintain this
>>>> >> CoordinatorContextBase, while I have strong feelings that we don't
>>>> want to
>>>> >> do that in the long term. I think we would need to take a big step
>>>> back and
>>>> >> first discuss how we would like to expose the coordinators and agree
>>>> how to
>>>> >> deal with the issues.
>>>> >>
>>>> >> First big issue that I see is that I would feel very worried exposing
>>>> >> coordinator API without at least designing/planning how to deal with
>>>> >> checkpointing their state. Without that, I'm afraid we might end up
>>>> in a
>>>> >> situation where we need to break the API in order to properly support
>>>> >> stateful coordinators. And at the moment I don't see a good and easy
>>>> >> solution to this problem.
>>>> >>
>>>> >> Second issue is the shape of the exposed public API. Exposing
>>>> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like
>>>> a bad
>>>> >> design, that would expose way too many things to the users, making
>>>> future
>>>> >> development more complicated for us and making implementation of
>>>> those
>>>> >> interfaces by the user unnecessary difficult. I see this as a
>>>> similar issue
>>>> >> as the low level `StreamOperator` API vs the higher level
>>>> >> `org.apache.flink.api.common.functions.Function` API. (instead of
>>>> exposing
>>>> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up
>>>> the
>>>> >> `ProcessFunction` to expose all of the remaining functionalities in a
>>>> >> user-friendly way). In the context of the coordinators, I would say
>>>> that we
>>>> >> should expose as the public API not the `OperatorCoordinator`, but
>>>> for
>>>> >> example some kind of an `EventProcessFunction` that would have a
>>>> simple
>>>> >> interface like:
>>>> >> ```
>>>> >> interface EventProcessFunction {
>>>> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
>>>> >> eventDispatcher);
>>>> >> }
>>>> >> ```
>>>> >> + maybe some features like processing time timers/mailbox style async
>>>> >> actions.
>>>> >> (or maybe that could have been just a regular `ProcessFunction` but
>>>> with
>>>> >> `OperatorEvent` with `int subtask` as input/output).
>>>> >>
>>>> >> Best,
>>>> >> Piotrek
>>>> >>
>>>> >> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
>>>> >>
>>>> >>> Hi Max and Qingsheng,
>>>> >>>
>>>> >>> Thanks for the feedback. The initial motivation to propose this is
>>>> to
>>>> >>> reduce the duplicated code since ShuffleCoordinator would need
>>>> similar
>>>> >>> communication logic as SourceCoordinator to talk with operators. I
>>>> >>> understand your concern that OperatorCoordinator is an internal
>>>> class and
>>>> >>> except SourceCoordinator for now no other uses this.
>>>> >>> How about let's do it like what Qingsheng said? I can go ahead with
>>>> the
>>>> >>> ShufflingCoordinator implementation without the extraction. Then we
>>>> have
>>>> >>> intuitive sense of how many codes are copied and can be reused. If
>>>> we feel
>>>> >>> that there is still a need to extract, we can revisit the
>>>> discussion.
>>>> >>>
>>>> >>> Thanks
>>>> >>> Gang
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same
>>>> concern
>>>> >>>> with Piotr and Maximilian.
>>>> >>>>
>>>> >>>> OperatorCoordinator is marked as @Internal intentionally
>>>> considering
>>>> >>>> some existing issues, like consistency between non-source operator
>>>> and
>>>> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose
>>>> a public
>>>> >>>> context to developers but have the OperatorCoordinator as an
>>>> internal API.
>>>> >>>> If we finally close all issues and decide to expose the operator
>>>> >>>> coordinator API, it would be a better chance to include the base
>>>> context as
>>>> >>>> a part of it.
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Qingsheng
>>>> >>>>
>>>> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Thanks Steven! My confusion stemmed from the lack of context in
>>>> the
>>>> >>>>> FLIP.
>>>> >>>>> The first version did not lay out how the refactoring would be
>>>> used
>>>> >>>>> down
>>>> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator
>>>> API
>>>> >>>>> is a
>>>> >>>>> non-public API and before reading the code, I wasn't even aware
>>>> how
>>>> >>>>> exactly
>>>> >>>>> it worked and whether it would be available to regular operators
>>>> (it
>>>> >>>>> was
>>>> >>>>> originally intended for sources only).
>>>> >>>>>
>>>> >>>>> I might seem pedantic here but I believe the purpose of a FLIP
>>>> should
>>>> >>>>> be to
>>>> >>>>> describe the *why* behind the changes, not only the changes
>>>> itself. A
>>>> >>>>> FLIP
>>>> >>>>> is not a formality but is a tool to communicate and discuss
>>>> changes. I
>>>> >>>>> think we still haven't laid out the exact reasons why we are
>>>> factoring
>>>> >>>>> out
>>>> >>>>> the base. As far as I understand now, we need the base class to
>>>> deal
>>>> >>>>> with
>>>> >>>>> concurrent updates in the custom Coordinator from the runtime
>>>> >>>>> (sub)tasks.
>>>> >>>>> Effectively, we are enforcing an actor model for the processing
>>>> of the
>>>> >>>>> incoming messages such that the OperatorCoordinator can cleanly
>>>> update
>>>> >>>>> its
>>>> >>>>> state. However, if there are no actual implementations that make
>>>> use
>>>> >>>>> of the
>>>> >>>>> refactoring in Flink itself, I wonder if it would make sense to
>>>> copy
>>>> >>>>> this
>>>> >>>>> code to the downstream implementation, e.g. the
>>>> ShuffleCoordinator. As
>>>> >>>>> soon
>>>> >>>>> as it is part of Flink, we could of course try to consolidate this
>>>> >>>>> code.
>>>> >>>>>
>>>> >>>>> Considering the *how* of this, there appear to be both methods
>>>> from
>>>> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>>> >>>>> SourceCoordinatorContext
>>>> >>>>> listed in the FLIP, as well as methods which do not appear
>>>> anywhere in
>>>> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
>>>> sendEventToOperator.
>>>> >>>>> It
>>>> >>>>> appears that some of this has been extracted from a downstream
>>>> >>>>> implementation. It would be great to adjust this, such that it
>>>> >>>>> reflects the
>>>> >>>>> status quo in Flink.
>>>> >>>>>
>>>> >>>>> -Max
>>>> >>>>>
>>>> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>> > Max,
>>>> >>>>> >
>>>> >>>>> > Thanks a lot for the comments. We should clarify that the
>>>> shuffle
>>>> >>>>> > operator/coordinator is not really part of the Flink sink
>>>> >>>>> > function/operator. shuffle operator is a custom operator that
>>>> can be
>>>> >>>>> > inserted right before the Iceberg writer operator. Shuffle
>>>> operator
>>>> >>>>> > calculates the traffic statistics and performs a custom
>>>> >>>>> partition/shuffle
>>>> >>>>> > (DataStream#partitionCustom) to cluster the data right before
>>>> they
>>>> >>>>> get to
>>>> >>>>> > the Iceberg writer operator.
>>>> >>>>> >
>>>> >>>>> > We are not proposing to introduce a sink coordinator for the
>>>> sink
>>>> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>>> >>>>> > facilitate the communication btw shuffle subtasks and
>>>> coordinator for
>>>> >>>>> > traffic statistics aggregation. The communication part is
>>>> already
>>>> >>>>> > implemented by SourceCoordinatorContext.
>>>> >>>>> >
>>>> >>>>> > Here are some details about the communication needs.
>>>> >>>>> > - subtasks periodically calculate local statistics and send to
>>>> the
>>>> >>>>> > coordinator for global aggregation
>>>> >>>>> > - the coordinator sends the globally aggregated statistics to
>>>> the
>>>> >>>>> subtasks
>>>> >>>>> > - subtasks use the globally aggregated statistics to guide the
>>>> >>>>> > partition/shuffle decision
>>>> >>>>> >
>>>> >>>>> > Regards,
>>>> >>>>> > Steven
>>>> >>>>> >
>>>> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <
>>>> mxm@apache.org>
>>>> >>>>> wrote:
>>>> >>>>> >
>>>> >>>>> > > Hi Gang,
>>>> >>>>> > >
>>>> >>>>> > > Looks much better! I've actually gone through the
>>>> >>>>> OperatorCoordinator
>>>> >>>>> > code.
>>>> >>>>> > > It turns out, any operator already has an OperatorCoordinator
>>>> >>>>> assigned.
>>>> >>>>> > > Also, any operator can add custom coordinator code. So it
>>>> looks
>>>> >>>>> like you
>>>> >>>>> > > won't have to implement any additional runtime logic to add a
>>>> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
>>>> >>>>> specifically need
>>>> >>>>> > to
>>>> >>>>> > > refactor the SourceCoordinatorContext? You could simply add
>>>> your
>>>> >>>>> own
>>>> >>>>> > > coordinator code. I'm not sure the sink requirements map to
>>>> the
>>>> >>>>> source
>>>> >>>>> > > interface so closely that you can reuse the same logic.
>>>> >>>>> > >
>>>> >>>>> > > If you can refactor SourceCoordinatorContext in a way that
>>>> makes
>>>> >>>>> it fit
>>>> >>>>> > > your use case, I have nothing to object here. By the way,
>>>> another
>>>> >>>>> example
>>>> >>>>> > > of an existing OperatorCoordinator is
>>>> >>>>> CollectSinkOperatorCoordinator
>>>> >>>>> > which
>>>> >>>>> > > is quite trivial but it might be worth evaluating whether you
>>>> need
>>>> >>>>> the
>>>> >>>>> > full
>>>> >>>>> > > power of SourceCoordinatorContext which is why I wanted to
>>>> get more
>>>> >>>>> > > context.
>>>> >>>>> > >
>>>> >>>>> > > -Max
>>>> >>>>> > >
>>>> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>> > >
>>>> >>>>> > > > Hi Max,
>>>> >>>>> > > > I got your concern. Since shuffling support for Flink
>>>> Iceberg
>>>> >>>>> sink is
>>>> >>>>> > not
>>>> >>>>> > > > the main body of the proposal, I add another appendix part
>>>> just
>>>> >>>>> now
>>>> >>>>> > with
>>>> >>>>> > > > more details about how to use CoordinatorContextBase and
>>>> how to
>>>> >>>>> define
>>>> >>>>> > > > ShufflingCoordinator.
>>>> >>>>> > > >
>>>> >>>>> > > > Let me know if that cannot solve your concern.
>>>> >>>>> > > >
>>>> >>>>> > > > Thanks
>>>> >>>>> > > > Gang
>>>> >>>>> > > >
>>>> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>>>> >>>>> mxm@apache.org>
>>>> >>>>> > > wrote:
>>>> >>>>> > > >
>>>> >>>>> > > >> Hey Gang,
>>>> >>>>> > > >>
>>>> >>>>> > > >> What I'm looking for here is a complete picture of why the
>>>> >>>>> change is
>>>> >>>>> > > >> necessary and what the next steps are. Ultimately,
>>>> refactoring
>>>> >>>>> any
>>>> >>>>> > code
>>>> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
>>>> >>>>> code such
>>>> >>>>> > > that
>>>> >>>>> > > >> we can add a SinkCoordinator, similar to the
>>>> SourceCoordinator.
>>>> >>>>> The
>>>> >>>>> > FLIP
>>>> >>>>> > > >> should address the next steps, i.e. how you plan to add the
>>>> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It
>>>> doesn't
>>>> >>>>> have to
>>>> >>>>> > be
>>>> >>>>> > > in
>>>> >>>>> > > >> great detail but without this information, I don't think
>>>> the
>>>> >>>>> FLIP is
>>>> >>>>> > > >> complete.
>>>> >>>>> > > >>
>>>> >>>>> > > >> This feature should be generic enough to be usable by other
>>>> >>>>> sinks than
>>>> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>>> >>>>> > > implementation
>>>> >>>>> > > >> which may be outlined in a separate FLIP.
>>>> >>>>> > > >>
>>>> >>>>> > > >> Unless there is a good reason, normal operators should not
>>>> >>>>> support the
>>>> >>>>> > > >> coordinator functionality. At least I'm not convinced it
>>>> would
>>>> >>>>> play
>>>> >>>>> > well
>>>> >>>>> > > >> with Flink's execution model. But I see how it is required
>>>> for
>>>> >>>>> sources
>>>> >>>>> > > and
>>>> >>>>> > > >> sinks.
>>>> >>>>> > > >>
>>>> >>>>> > > >> -Max
>>>> >>>>> > > >>
>>>> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <
>>>> yegangapp@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>> > > >>
>>>> >>>>> > > >>> Hi Max,
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Thanks for reviewing.
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting
>>>> RPC
>>>> >>>>> calls
>>>> >>>>> > > >>> between the task and the job manager for communications
>>>> and
>>>> >>>>> won't
>>>> >>>>> > touch
>>>> >>>>> > > >>> watermark checkpoint.
>>>> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>>>> >>>>> subtasks, then
>>>> >>>>> > > it
>>>> >>>>> > > >>> can define context without extending from the
>>>> >>>>> > CoordinatorContextBase(or
>>>> >>>>> > > >>> find another class name to limit the scope).
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we
>>>> will
>>>> >>>>> only do
>>>> >>>>> > > >>> context extraction. The shuffling coordinator and operator
>>>> >>>>> > > >>> <
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> >>>>> > > >
>>>> >>>>> > > >>> which will use the context will come with a separate
>>>> proposal,
>>>> >>>>> thus
>>>> >>>>> > we
>>>> >>>>> > > try
>>>> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
>>>> >>>>> little bit
>>>> >>>>> > > more
>>>> >>>>> > > >>> about how to use the coordinator context in Flip 264 if
>>>> you
>>>> >>>>> think
>>>> >>>>> > that
>>>> >>>>> > > will
>>>> >>>>> > > >>> be helpful.
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> Thanks!
>>>> >>>>> > > >>> Gang
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>
>>>> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>>> >>>>> mxm@apache.org>
>>>> >>>>> > > >>> wrote:
>>>> >>>>> > > >>>
>>>> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat
>>>> of a
>>>> >>>>> bigger
>>>> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27,
>>>> was
>>>> >>>>> > > specifically
>>>> >>>>> > > >>>> added to synchronize the global watermark and to assign
>>>> splits
>>>> >>>>> > > dynamically.
>>>> >>>>> > > >>>> However, it practically allows arbitrary RPC calls
>>>> between
>>>> >>>>> the task
>>>> >>>>> > > and the
>>>> >>>>> > > >>>> job manager. I understand that there is concern that
>>>> such a
>>>> >>>>> powerful
>>>> >>>>> > > >>>> mechanism should not be available to all operators.
>>>> >>>>> Nevertheless, I
>>>> >>>>> > > see the
>>>> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd
>>>> suggest
>>>> >>>>> limiting
>>>> >>>>> > > this
>>>> >>>>> > > >>>> feature to sinks (and sources) only.
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> I'm wondering whether extracting the
>>>> SourceCoordinatorContext
>>>> >>>>> is
>>>> >>>>> > > >>>> enough to achieve what you want. There will be
>>>> additional work
>>>> >>>>> > > necessary,
>>>> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to
>>>> SourceCoordinator
>>>> >>>>> which
>>>> >>>>> > > handles
>>>> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
>>>> good
>>>> >>>>> to
>>>> >>>>> > > outline
>>>> >>>>> > > >>>> this in the FLIP.
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> -Max
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>>>> >>>>> stevenz3wu@gmail.com>
>>>> >>>>> > > wrote:
>>>> >>>>> > > >>>>
>>>> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In
>>>> the
>>>> >>>>> > > FLINK-27405
>>>> >>>>> > > >>>>> [1],
>>>> >>>>> > > >>>>> Avid pointed out some implications regarding
>>>> checkpointing.
>>>> >>>>> In this
>>>> >>>>> > > >>>>> small
>>>> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing
>>>> logic,
>>>> >>>>> we
>>>> >>>>> > mainly
>>>> >>>>> > > >>>>> need
>>>> >>>>> > > >>>>> the coordinator context functionality to facilitate the
>>>> >>>>> > communication
>>>> >>>>> > > >>>>> between coordinator and subtasks.
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>>> >>>>> stevenz3wu@gmail.com>
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
>>>> >>>>> `CoordinatorContextBase`
>>>> >>>>> > > is a
>>>> >>>>> > > >>>>> > better name considering Flink code convention.
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss.
>>>> In the
>>>> >>>>> jira,
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>>> >>>>> > ruanhang1993@gmail.com
>>>> >>>>> > > >
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >> Hi,
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
>>>> >>>>> > SourceCoordinatorContext.
>>>> >>>>> > > >>>>> >> But I prefer to use the name
>>>> >>>>> `OperatorCoordinatorContextBase` or
>>>> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>>> >>>>> `SourceReaderBase`.
>>>> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
>>>> will
>>>> >>>>> occur
>>>> >>>>> > > when
>>>> >>>>> > > >>>>> >> connectors start to use it.
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> Best,
>>>> >>>>> > > >>>>> >> Hang
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五
>>>> 22:31写道:
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >> > Piotr,
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>>> >>>>> @Iinternal
>>>> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>>> >>>>> > > >>>>> BaseCoordinatorContext.
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > The motivation is that other operators can
>>>> leverage the
>>>> >>>>> > > >>>>> communication
>>>> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>>>> >>>>> subtasks. For
>>>> >>>>> > > >>>>> example,
>>>> >>>>> > > >>>>> >> in
>>>> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
>>>> >>>>> Iceberg sink)
>>>> >>>>> > > can
>>>> >>>>> > > >>>>> >> leverage
>>>> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>>> >>>>> > > >>>>> >> > * subtasks calculate local statistics and
>>>> periodically
>>>> >>>>> send
>>>> >>>>> > them
>>>> >>>>> > > >>>>> to the
>>>> >>>>> > > >>>>> >> > coordinator for global aggregation.
>>>> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
>>>> aggregated
>>>> >>>>> > > >>>>> statistics to
>>>> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>>> >>>>> decision
>>>> >>>>> > > >>>>> (selecting
>>>> >>>>> > > >>>>> >> > downstream channels).
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > Thanks,
>>>> >>>>> > > >>>>> >> > Steven
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>>> >>>>> > > >>>>> pnowojski@apache.org>
>>>> >>>>> > > >>>>> >> > wrote:
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >> > > Hi,
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you
>>>> have
>>>> >>>>> in mind?
>>>> >>>>> > > >>>>> From the
>>>> >>>>> > > >>>>> >> > > context I would understand that the newly
>>>> extracted
>>>> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked
>>>> as
>>>> >>>>> > > >>>>> `@PublicEvolving`
>>>> >>>>> > > >>>>> >> or
>>>> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it
>>>> and
>>>> >>>>> keeping
>>>> >>>>> > > >>>>> `@Internal`
>>>> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>>>> >>>>> could have
>>>> >>>>> > > >>>>> been
>>>> >>>>> > > >>>>> >> removed
>>>> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
>>>> that,
>>>> >>>>> it
>>>> >>>>> > > sounds
>>>> >>>>> > > >>>>> to me
>>>> >>>>> > > >>>>> >> > like
>>>> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at
>>>> the
>>>> >>>>> first
>>>> >>>>> > > glance
>>>> >>>>> > > >>>>> and
>>>> >>>>> > > >>>>> >> you
>>>> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>>> >>>>> concept to
>>>> >>>>> > > the
>>>> >>>>> > > >>>>> public
>>>> >>>>> > > >>>>> >> > API?
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that,
>>>> and it
>>>> >>>>> was a
>>>> >>>>> > bit
>>>> >>>>> > > >>>>> of a
>>>> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
>>>> those
>>>> >>>>> > reasons
>>>> >>>>> > > >>>>> however.
>>>> >>>>> > > >>>>> >> > Only
>>>> >>>>> > > >>>>> >> > > now, I've just heard that there are for example
>>>> some
>>>> >>>>> > problems
>>>> >>>>> > > >>>>> with
>>>> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>>> >>>>> > > coordinators.
>>>> >>>>> > > >>>>> Maybe
>>>> >>>>> > > >>>>> >> > > someone else could shed some light on this?
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
>>>> exposing
>>>> >>>>> > > operator
>>>> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind
>>>> that,
>>>> >>>>> but it
>>>> >>>>> > is
>>>> >>>>> > > a
>>>> >>>>> > > >>>>> more
>>>> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort
>>>> than it
>>>> >>>>> seems
>>>> >>>>> > at
>>>> >>>>> > > >>>>> the
>>>> >>>>> > > >>>>> >> first
>>>> >>>>> > > >>>>> >> > > glance.
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > Best,
>>>> >>>>> > > >>>>> >> > > Piotrek
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>>>> >>>>> stevenz3wu@gmail.com>
>>>> >>>>> > > >>>>> napisał(a):
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
>>>> >>>>> google doc
>>>> >>>>> > is
>>>> >>>>> > > >>>>> not for
>>>> >>>>> > > >>>>> >> > this
>>>> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki
>>>> page.
>>>> >>>>> The
>>>> >>>>> > linked
>>>> >>>>> > > >>>>> google
>>>> >>>>> > > >>>>> >> doc
>>>> >>>>> > > >>>>> >> > > is
>>>> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>>>> >>>>> Iceberg
>>>> >>>>> > sink,
>>>> >>>>> > > >>>>> which
>>>> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the
>>>> shuffle
>>>> >>>>> > coordinator
>>>> >>>>> > > >>>>> can
>>>> >>>>> > > >>>>> >> > leverage
>>>> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
>>>> code
>>>> >>>>> > > >>>>> duplication.
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>>> >>>>> > jing@ververica.com>
>>>> >>>>> > > >>>>> wrote:
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>>>> >>>>> good! One
>>>> >>>>> > > >>>>> small
>>>> >>>>> > > >>>>> >> thing,
>>>> >>>>> > > >>>>> >> > > you
>>>> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki
>>>> page
>>>> >>>>> instead
>>>> >>>>> > > of
>>>> >>>>> > > >>>>> >> linking
>>>> >>>>> > > >>>>> >> > to
>>>> >>>>> > > >>>>> >> > > a
>>>> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people
>>>> might
>>>> >>>>> not be
>>>> >>>>> > > >>>>> able to
>>>> >>>>> > > >>>>> >> > access
>>>> >>>>> > > >>>>> >> > > > the
>>>> >>>>> > > >>>>> >> > > > > google doc.
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > > Best regards,
>>>> >>>>> > > >>>>> >> > > > > Jing
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>>> >>>>> > > yegangapp@gmail.com
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>> >> wrote:
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > > >> Hi,
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>>> >>>>> > > >>>>> >> > > > >> <
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>>> >>>>> > > >>>>> >> > > > >> >
>>>> >>>>> > > >>>>> >> > > > >> at Confluent to extract
>>>> BaseCoordinatorContext
>>>> >>>>> from
>>>> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for
>>>> other
>>>> >>>>> > > >>>>> coordinators E.g.
>>>> >>>>> > > >>>>> >> in
>>>> >>>>> > > >>>>> >> > > the
>>>> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>>> >>>>> > > >>>>> >> > > > >> <
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> >>>>> > > >>>>> >> > > > >> >
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
>>>> >>>>> > > >>>>> >> > > > >> Thanks
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >> Gang
>>>> >>>>> > > >>>>> >> > > > >>
>>>> >>>>> > > >>>>> >> > > > >
>>>> >>>>> > > >>>>> >> > > >
>>>> >>>>> > > >>>>> >> > >
>>>> >>>>> > > >>>>> >> >
>>>> >>>>> > > >>>>> >>
>>>> >>>>> > > >>>>> >
>>>> >>>>> > > >>>>>
>>>> >>>>> > > >>>>
>>>> >>>>> > >
>>>> >>>>> >
>>>> >>>>>
>>>> >>>>
>>>>
>>>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Jark Wu <im...@gmail.com>.
Thank Steven for the explanation.

It sounds good to me to implement the shuffle operator in the Iceberg
project first.
We can contribute it to Flink DataStream in the future if other
projects/connectors also need it.

Best,
Jark


On Wed, 18 Jan 2023 at 02:11, Steven Wu <st...@gmail.com> wrote:

> Jark,
>
> We were planning to discard the proposal due to some valid concerns raised
> in the thread. Also, this proposal itself didn't really save too much code
> duplication (maybe 100 lines or so).
>
> I also thought that the shuffle operator for DataStream can be useful for
> other connectors too. The shuffling part (based on traffic statistics) can
> be generic for other connectors. There will be some small integration part
> unique to Iceberg, which can stay in Iceberg. If we go with this new
> direction, we would need a new FLIP.
>
> Thanks,
> Steven
>
>
>
> On Mon, Jan 16, 2023 at 12:30 AM Jark Wu <im...@gmail.com> wrote:
>
>> What's the status and conclusion of this discussion?
>>
>> I have seen the value of exposing OperatorCoordinator because of the
>> powerful RPC calls,
>> some projects are already using it, such as Hudi[1]. But I agree this is
>> a large topic and
>> requires another FLIP.
>>
>> I am also concerned about extracting a Public base class without
>> implementations, and
>> clear usage is easy to break in the future. However, I think the
>> shuffling operator can be a
>> generic component used by other connectors and DataStream jobs.
>>
>> Have you considered contributing the ShuffleOperator to the Flink main
>> repository as a
>> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
>> extract the common
>> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
>> single repository
>>  as an internal implementation.
>>
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
>>
>> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pn...@apache.org> wrote:
>>
>>> Ohhh, I was confused. I thought that the proposal is to make
>>> `CoordinatorContextBase` part of the public API.
>>>
>>> However, I'm also against extracting `CoordinatorContextBase` as an
>>> `@Internal` class as well.
>>>
>>> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
>>> CoordinatedOperatorFactory would be already quite bad, but at least this
>>> is
>>> a relatively stable internal API. Using `@Internal`
>>> `@CoordinatorContextBase`, and refactoring out this base class just for
>>> the
>>> sake of re-using it in a connector is IMO even worse.
>>> 2. Double so if they are in a separate repository (as the iceberg
>>> connector
>>> will be/is, right?). There would be no way to prevent breaking changes
>>> between repositories.
>>>
>>> If that's only intended as the stop-gap solution until we properly expose
>>> coordinators, the lesser evil would be IMO to copy/paste/modify
>>> SourceCoordinatorContext to the flink-connector-iceberg repository.
>>>
>>> Best,
>>> Piotrek
>>>
>>> czw., 3 lis 2022 o 12:51 Maximilian Michels <mx...@apache.org> napisał(a):
>>>
>>> > +1 If we wanted to expose the OperatorCoordinator API, we should
>>> provide
>>> > an adequate interface. The FLIP partially addresses this by trying to
>>> > factor out RPC code which other coordinators might make use of, but
>>> there
>>> > is additional design necessary to realize a public operator API.
>>> >
>>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
>>> > think they make sense in the context of an Iceberg ShuffleCoordinator
>>> in
>>> > Flink. If we were to add such a new coordinator, feel free to make the
>>> > proposed code refactoring as part of a pull request. A FLIP isn't
>>> strictly
>>> > necessary here because this is a purely internal change which does not
>>> > alter public APIs, nor does it alter the internal architecture, apart
>>> from
>>> > reusing a bit of existing code. I'm sorry if we consumed some of your
>>> time
>>> > revising the document but I think we had a healthy discussion here. And
>>> > we're definitely looking forward to seeing some of these code changes!
>>> >
>>> > -Max
>>> >
>>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org>
>>> > wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> Sorry for the delay, but I've given more thoughts into this. First I
>>> >> share the same thoughts as Maximilian, that this FLIP is incomplete.
>>> As I
>>> >> understand it, you are trying to hack existing code to expose small
>>> bits of
>>> >> internal functionalities as part of the public API without solving
>>> many of
>>> >> the underlying issues.
>>> >>
>>> >> For example, what's the point of exposing `CoordinatorContextBase` as
>>> a
>>> >> public API if users can not use it? After all, the
>>> `OperatorCoordinator`
>>> >> and `CoordinatedOperatorFactory` would remain internal. At the same
>>> time,
>>> >> this FLIP would officially force us to support and maintain this
>>> >> CoordinatorContextBase, while I have strong feelings that we don't
>>> want to
>>> >> do that in the long term. I think we would need to take a big step
>>> back and
>>> >> first discuss how we would like to expose the coordinators and agree
>>> how to
>>> >> deal with the issues.
>>> >>
>>> >> First big issue that I see is that I would feel very worried exposing
>>> >> coordinator API without at least designing/planning how to deal with
>>> >> checkpointing their state. Without that, I'm afraid we might end up
>>> in a
>>> >> situation where we need to break the API in order to properly support
>>> >> stateful coordinators. And at the moment I don't see a good and easy
>>> >> solution to this problem.
>>> >>
>>> >> Second issue is the shape of the exposed public API. Exposing
>>> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a
>>> bad
>>> >> design, that would expose way too many things to the users, making
>>> future
>>> >> development more complicated for us and making implementation of those
>>> >> interfaces by the user unnecessary difficult. I see this as a similar
>>> issue
>>> >> as the low level `StreamOperator` API vs the higher level
>>> >> `org.apache.flink.api.common.functions.Function` API. (instead of
>>> exposing
>>> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up
>>> the
>>> >> `ProcessFunction` to expose all of the remaining functionalities in a
>>> >> user-friendly way). In the context of the coordinators, I would say
>>> that we
>>> >> should expose as the public API not the `OperatorCoordinator`, but for
>>> >> example some kind of an `EventProcessFunction` that would have a
>>> simple
>>> >> interface like:
>>> >> ```
>>> >> interface EventProcessFunction {
>>> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
>>> >> eventDispatcher);
>>> >> }
>>> >> ```
>>> >> + maybe some features like processing time timers/mailbox style async
>>> >> actions.
>>> >> (or maybe that could have been just a regular `ProcessFunction` but
>>> with
>>> >> `OperatorEvent` with `int subtask` as input/output).
>>> >>
>>> >> Best,
>>> >> Piotrek
>>> >>
>>> >> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
>>> >>
>>> >>> Hi Max and Qingsheng,
>>> >>>
>>> >>> Thanks for the feedback. The initial motivation to propose this is to
>>> >>> reduce the duplicated code since ShuffleCoordinator would need
>>> similar
>>> >>> communication logic as SourceCoordinator to talk with operators. I
>>> >>> understand your concern that OperatorCoordinator is an internal
>>> class and
>>> >>> except SourceCoordinator for now no other uses this.
>>> >>> How about let's do it like what Qingsheng said? I can go ahead with
>>> the
>>> >>> ShufflingCoordinator implementation without the extraction. Then we
>>> have
>>> >>> intuitive sense of how many codes are copied and can be reused. If
>>> we feel
>>> >>> that there is still a need to extract, we can revisit the discussion.
>>> >>>
>>> >>> Thanks
>>> >>> Gang
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
>>> wrote:
>>> >>>
>>> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same
>>> concern
>>> >>>> with Piotr and Maximilian.
>>> >>>>
>>> >>>> OperatorCoordinator is marked as @Internal intentionally considering
>>> >>>> some existing issues, like consistency between non-source operator
>>> and
>>> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose
>>> a public
>>> >>>> context to developers but have the OperatorCoordinator as an
>>> internal API.
>>> >>>> If we finally close all issues and decide to expose the operator
>>> >>>> coordinator API, it would be a better chance to include the base
>>> context as
>>> >>>> a part of it.
>>> >>>>
>>> >>>> Best,
>>> >>>> Qingsheng
>>> >>>>
>>> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
>>> >>>> wrote:
>>> >>>>
>>> >>>>> Thanks Steven! My confusion stemmed from the lack of context in the
>>> >>>>> FLIP.
>>> >>>>> The first version did not lay out how the refactoring would be used
>>> >>>>> down
>>> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator
>>> API
>>> >>>>> is a
>>> >>>>> non-public API and before reading the code, I wasn't even aware how
>>> >>>>> exactly
>>> >>>>> it worked and whether it would be available to regular operators
>>> (it
>>> >>>>> was
>>> >>>>> originally intended for sources only).
>>> >>>>>
>>> >>>>> I might seem pedantic here but I believe the purpose of a FLIP
>>> should
>>> >>>>> be to
>>> >>>>> describe the *why* behind the changes, not only the changes
>>> itself. A
>>> >>>>> FLIP
>>> >>>>> is not a formality but is a tool to communicate and discuss
>>> changes. I
>>> >>>>> think we still haven't laid out the exact reasons why we are
>>> factoring
>>> >>>>> out
>>> >>>>> the base. As far as I understand now, we need the base class to
>>> deal
>>> >>>>> with
>>> >>>>> concurrent updates in the custom Coordinator from the runtime
>>> >>>>> (sub)tasks.
>>> >>>>> Effectively, we are enforcing an actor model for the processing of
>>> the
>>> >>>>> incoming messages such that the OperatorCoordinator can cleanly
>>> update
>>> >>>>> its
>>> >>>>> state. However, if there are no actual implementations that make
>>> use
>>> >>>>> of the
>>> >>>>> refactoring in Flink itself, I wonder if it would make sense to
>>> copy
>>> >>>>> this
>>> >>>>> code to the downstream implementation, e.g. the
>>> ShuffleCoordinator. As
>>> >>>>> soon
>>> >>>>> as it is part of Flink, we could of course try to consolidate this
>>> >>>>> code.
>>> >>>>>
>>> >>>>> Considering the *how* of this, there appear to be both methods from
>>> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>> >>>>> SourceCoordinatorContext
>>> >>>>> listed in the FLIP, as well as methods which do not appear
>>> anywhere in
>>> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
>>> sendEventToOperator.
>>> >>>>> It
>>> >>>>> appears that some of this has been extracted from a downstream
>>> >>>>> implementation. It would be great to adjust this, such that it
>>> >>>>> reflects the
>>> >>>>> status quo in Flink.
>>> >>>>>
>>> >>>>> -Max
>>> >>>>>
>>> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>> > Max,
>>> >>>>> >
>>> >>>>> > Thanks a lot for the comments. We should clarify that the shuffle
>>> >>>>> > operator/coordinator is not really part of the Flink sink
>>> >>>>> > function/operator. shuffle operator is a custom operator that
>>> can be
>>> >>>>> > inserted right before the Iceberg writer operator. Shuffle
>>> operator
>>> >>>>> > calculates the traffic statistics and performs a custom
>>> >>>>> partition/shuffle
>>> >>>>> > (DataStream#partitionCustom) to cluster the data right before
>>> they
>>> >>>>> get to
>>> >>>>> > the Iceberg writer operator.
>>> >>>>> >
>>> >>>>> > We are not proposing to introduce a sink coordinator for the sink
>>> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>> >>>>> > facilitate the communication btw shuffle subtasks and
>>> coordinator for
>>> >>>>> > traffic statistics aggregation. The communication part is already
>>> >>>>> > implemented by SourceCoordinatorContext.
>>> >>>>> >
>>> >>>>> > Here are some details about the communication needs.
>>> >>>>> > - subtasks periodically calculate local statistics and send to
>>> the
>>> >>>>> > coordinator for global aggregation
>>> >>>>> > - the coordinator sends the globally aggregated statistics to the
>>> >>>>> subtasks
>>> >>>>> > - subtasks use the globally aggregated statistics to guide the
>>> >>>>> > partition/shuffle decision
>>> >>>>> >
>>> >>>>> > Regards,
>>> >>>>> > Steven
>>> >>>>> >
>>> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <
>>> mxm@apache.org>
>>> >>>>> wrote:
>>> >>>>> >
>>> >>>>> > > Hi Gang,
>>> >>>>> > >
>>> >>>>> > > Looks much better! I've actually gone through the
>>> >>>>> OperatorCoordinator
>>> >>>>> > code.
>>> >>>>> > > It turns out, any operator already has an OperatorCoordinator
>>> >>>>> assigned.
>>> >>>>> > > Also, any operator can add custom coordinator code. So it looks
>>> >>>>> like you
>>> >>>>> > > won't have to implement any additional runtime logic to add a
>>> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
>>> >>>>> specifically need
>>> >>>>> > to
>>> >>>>> > > refactor the SourceCoordinatorContext? You could simply add
>>> your
>>> >>>>> own
>>> >>>>> > > coordinator code. I'm not sure the sink requirements map to the
>>> >>>>> source
>>> >>>>> > > interface so closely that you can reuse the same logic.
>>> >>>>> > >
>>> >>>>> > > If you can refactor SourceCoordinatorContext in a way that
>>> makes
>>> >>>>> it fit
>>> >>>>> > > your use case, I have nothing to object here. By the way,
>>> another
>>> >>>>> example
>>> >>>>> > > of an existing OperatorCoordinator is
>>> >>>>> CollectSinkOperatorCoordinator
>>> >>>>> > which
>>> >>>>> > > is quite trivial but it might be worth evaluating whether you
>>> need
>>> >>>>> the
>>> >>>>> > full
>>> >>>>> > > power of SourceCoordinatorContext which is why I wanted to get
>>> more
>>> >>>>> > > context.
>>> >>>>> > >
>>> >>>>> > > -Max
>>> >>>>> > >
>>> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
>>> >>>>> wrote:
>>> >>>>> > >
>>> >>>>> > > > Hi Max,
>>> >>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
>>> >>>>> sink is
>>> >>>>> > not
>>> >>>>> > > > the main body of the proposal, I add another appendix part
>>> just
>>> >>>>> now
>>> >>>>> > with
>>> >>>>> > > > more details about how to use CoordinatorContextBase and how
>>> to
>>> >>>>> define
>>> >>>>> > > > ShufflingCoordinator.
>>> >>>>> > > >
>>> >>>>> > > > Let me know if that cannot solve your concern.
>>> >>>>> > > >
>>> >>>>> > > > Thanks
>>> >>>>> > > > Gang
>>> >>>>> > > >
>>> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>>> >>>>> mxm@apache.org>
>>> >>>>> > > wrote:
>>> >>>>> > > >
>>> >>>>> > > >> Hey Gang,
>>> >>>>> > > >>
>>> >>>>> > > >> What I'm looking for here is a complete picture of why the
>>> >>>>> change is
>>> >>>>> > > >> necessary and what the next steps are. Ultimately,
>>> refactoring
>>> >>>>> any
>>> >>>>> > code
>>> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
>>> >>>>> code such
>>> >>>>> > > that
>>> >>>>> > > >> we can add a SinkCoordinator, similar to the
>>> SourceCoordinator.
>>> >>>>> The
>>> >>>>> > FLIP
>>> >>>>> > > >> should address the next steps, i.e. how you plan to add the
>>> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
>>> >>>>> have to
>>> >>>>> > be
>>> >>>>> > > in
>>> >>>>> > > >> great detail but without this information, I don't think the
>>> >>>>> FLIP is
>>> >>>>> > > >> complete.
>>> >>>>> > > >>
>>> >>>>> > > >> This feature should be generic enough to be usable by other
>>> >>>>> sinks than
>>> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>> >>>>> > > implementation
>>> >>>>> > > >> which may be outlined in a separate FLIP.
>>> >>>>> > > >>
>>> >>>>> > > >> Unless there is a good reason, normal operators should not
>>> >>>>> support the
>>> >>>>> > > >> coordinator functionality. At least I'm not convinced it
>>> would
>>> >>>>> play
>>> >>>>> > well
>>> >>>>> > > >> with Flink's execution model. But I see how it is required
>>> for
>>> >>>>> sources
>>> >>>>> > > and
>>> >>>>> > > >> sinks.
>>> >>>>> > > >>
>>> >>>>> > > >> -Max
>>> >>>>> > > >>
>>> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <
>>> yegangapp@gmail.com>
>>> >>>>> wrote:
>>> >>>>> > > >>
>>> >>>>> > > >>> Hi Max,
>>> >>>>> > > >>>
>>> >>>>> > > >>> Thanks for reviewing.
>>> >>>>> > > >>>
>>> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting
>>> RPC
>>> >>>>> calls
>>> >>>>> > > >>> between the task and the job manager for communications and
>>> >>>>> won't
>>> >>>>> > touch
>>> >>>>> > > >>> watermark checkpoint.
>>> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>>> >>>>> subtasks, then
>>> >>>>> > > it
>>> >>>>> > > >>> can define context without extending from the
>>> >>>>> > CoordinatorContextBase(or
>>> >>>>> > > >>> find another class name to limit the scope).
>>> >>>>> > > >>>
>>> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we
>>> will
>>> >>>>> only do
>>> >>>>> > > >>> context extraction. The shuffling coordinator and operator
>>> >>>>> > > >>> <
>>> >>>>> > >
>>> >>>>> >
>>> >>>>>
>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>> >>>>> > > >
>>> >>>>> > > >>> which will use the context will come with a separate
>>> proposal,
>>> >>>>> thus
>>> >>>>> > we
>>> >>>>> > > try
>>> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
>>> >>>>> little bit
>>> >>>>> > > more
>>> >>>>> > > >>> about how to use the coordinator context in Flip 264 if you
>>> >>>>> think
>>> >>>>> > that
>>> >>>>> > > will
>>> >>>>> > > >>> be helpful.
>>> >>>>> > > >>>
>>> >>>>> > > >>> Thanks!
>>> >>>>> > > >>> Gang
>>> >>>>> > > >>>
>>> >>>>> > > >>>
>>> >>>>> > > >>>
>>> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>> >>>>> mxm@apache.org>
>>> >>>>> > > >>> wrote:
>>> >>>>> > > >>>
>>> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of
>>> a
>>> >>>>> bigger
>>> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27,
>>> was
>>> >>>>> > > specifically
>>> >>>>> > > >>>> added to synchronize the global watermark and to assign
>>> splits
>>> >>>>> > > dynamically.
>>> >>>>> > > >>>> However, it practically allows arbitrary RPC calls between
>>> >>>>> the task
>>> >>>>> > > and the
>>> >>>>> > > >>>> job manager. I understand that there is concern that such
>>> a
>>> >>>>> powerful
>>> >>>>> > > >>>> mechanism should not be available to all operators.
>>> >>>>> Nevertheless, I
>>> >>>>> > > see the
>>> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd
>>> suggest
>>> >>>>> limiting
>>> >>>>> > > this
>>> >>>>> > > >>>> feature to sinks (and sources) only.
>>> >>>>> > > >>>>
>>> >>>>> > > >>>> I'm wondering whether extracting the
>>> SourceCoordinatorContext
>>> >>>>> is
>>> >>>>> > > >>>> enough to achieve what you want. There will be additional
>>> work
>>> >>>>> > > necessary,
>>> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to
>>> SourceCoordinator
>>> >>>>> which
>>> >>>>> > > handles
>>> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
>>> good
>>> >>>>> to
>>> >>>>> > > outline
>>> >>>>> > > >>>> this in the FLIP.
>>> >>>>> > > >>>>
>>> >>>>> > > >>>> -Max
>>> >>>>> > > >>>>
>>> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>>> >>>>> stevenz3wu@gmail.com>
>>> >>>>> > > wrote:
>>> >>>>> > > >>>>
>>> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>> >>>>> > > >>>>>
>>> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In
>>> the
>>> >>>>> > > FLINK-27405
>>> >>>>> > > >>>>> [1],
>>> >>>>> > > >>>>> Avid pointed out some implications regarding
>>> checkpointing.
>>> >>>>> In this
>>> >>>>> > > >>>>> small
>>> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing
>>> logic,
>>> >>>>> we
>>> >>>>> > mainly
>>> >>>>> > > >>>>> need
>>> >>>>> > > >>>>> the coordinator context functionality to facilitate the
>>> >>>>> > communication
>>> >>>>> > > >>>>> between coordinator and subtasks.
>>> >>>>> > > >>>>>
>>> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>> >>>>> > > >>>>>
>>> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>> >>>>> stevenz3wu@gmail.com>
>>> >>>>> > > >>>>> wrote:
>>> >>>>> > > >>>>>
>>> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
>>> >>>>> `CoordinatorContextBase`
>>> >>>>> > > is a
>>> >>>>> > > >>>>> > better name considering Flink code convention.
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In
>>> the
>>> >>>>> jira,
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>> >>>>> > ruanhang1993@gmail.com
>>> >>>>> > > >
>>> >>>>> > > >>>>> wrote:
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> >> Hi,
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
>>> >>>>> > SourceCoordinatorContext.
>>> >>>>> > > >>>>> >> But I prefer to use the name
>>> >>>>> `OperatorCoordinatorContextBase` or
>>> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>> >>>>> `SourceReaderBase`.
>>> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
>>> will
>>> >>>>> occur
>>> >>>>> > > when
>>> >>>>> > > >>>>> >> connectors start to use it.
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>> >> Best,
>>> >>>>> > > >>>>> >> Hang
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五
>>> 22:31写道:
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>> >> > Piotr,
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>> >>>>> @Iinternal
>>> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>> >>>>> > > >>>>> BaseCoordinatorContext.
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> > The motivation is that other operators can leverage
>>> the
>>> >>>>> > > >>>>> communication
>>> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>>> >>>>> subtasks. For
>>> >>>>> > > >>>>> example,
>>> >>>>> > > >>>>> >> in
>>> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
>>> >>>>> Iceberg sink)
>>> >>>>> > > can
>>> >>>>> > > >>>>> >> leverage
>>> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>> >>>>> > > >>>>> >> > * subtasks calculate local statistics and
>>> periodically
>>> >>>>> send
>>> >>>>> > them
>>> >>>>> > > >>>>> to the
>>> >>>>> > > >>>>> >> > coordinator for global aggregation.
>>> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
>>> aggregated
>>> >>>>> > > >>>>> statistics to
>>> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>> >>>>> decision
>>> >>>>> > > >>>>> (selecting
>>> >>>>> > > >>>>> >> > downstream channels).
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> > Thanks,
>>> >>>>> > > >>>>> >> > Steven
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>> >>>>> > > >>>>> pnowojski@apache.org>
>>> >>>>> > > >>>>> >> > wrote:
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >> > > Hi,
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you
>>> have
>>> >>>>> in mind?
>>> >>>>> > > >>>>> From the
>>> >>>>> > > >>>>> >> > > context I would understand that the newly
>>> extracted
>>> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked
>>> as
>>> >>>>> > > >>>>> `@PublicEvolving`
>>> >>>>> > > >>>>> >> or
>>> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>>> >>>>> keeping
>>> >>>>> > > >>>>> `@Internal`
>>> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>>> >>>>> could have
>>> >>>>> > > >>>>> been
>>> >>>>> > > >>>>> >> removed
>>> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
>>> that,
>>> >>>>> it
>>> >>>>> > > sounds
>>> >>>>> > > >>>>> to me
>>> >>>>> > > >>>>> >> > like
>>> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
>>> >>>>> first
>>> >>>>> > > glance
>>> >>>>> > > >>>>> and
>>> >>>>> > > >>>>> >> you
>>> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>> >>>>> concept to
>>> >>>>> > > the
>>> >>>>> > > >>>>> public
>>> >>>>> > > >>>>> >> > API?
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and
>>> it
>>> >>>>> was a
>>> >>>>> > bit
>>> >>>>> > > >>>>> of a
>>> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
>>> those
>>> >>>>> > reasons
>>> >>>>> > > >>>>> however.
>>> >>>>> > > >>>>> >> > Only
>>> >>>>> > > >>>>> >> > > now, I've just heard that there are for example
>>> some
>>> >>>>> > problems
>>> >>>>> > > >>>>> with
>>> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>> >>>>> > > coordinators.
>>> >>>>> > > >>>>> Maybe
>>> >>>>> > > >>>>> >> > > someone else could shed some light on this?
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
>>> exposing
>>> >>>>> > > operator
>>> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind
>>> that,
>>> >>>>> but it
>>> >>>>> > is
>>> >>>>> > > a
>>> >>>>> > > >>>>> more
>>> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort than
>>> it
>>> >>>>> seems
>>> >>>>> > at
>>> >>>>> > > >>>>> the
>>> >>>>> > > >>>>> >> first
>>> >>>>> > > >>>>> >> > > glance.
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > Best,
>>> >>>>> > > >>>>> >> > > Piotrek
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>>> >>>>> stevenz3wu@gmail.com>
>>> >>>>> > > >>>>> napisał(a):
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
>>> >>>>> google doc
>>> >>>>> > is
>>> >>>>> > > >>>>> not for
>>> >>>>> > > >>>>> >> > this
>>> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki
>>> page.
>>> >>>>> The
>>> >>>>> > linked
>>> >>>>> > > >>>>> google
>>> >>>>> > > >>>>> >> doc
>>> >>>>> > > >>>>> >> > > is
>>> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>>> >>>>> Iceberg
>>> >>>>> > sink,
>>> >>>>> > > >>>>> which
>>> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>>> >>>>> > coordinator
>>> >>>>> > > >>>>> can
>>> >>>>> > > >>>>> >> > leverage
>>> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
>>> code
>>> >>>>> > > >>>>> duplication.
>>> >>>>> > > >>>>> >> > > >
>>> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>> >>>>> > jing@ververica.com>
>>> >>>>> > > >>>>> wrote:
>>> >>>>> > > >>>>> >> > > >
>>> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>>> >>>>> good! One
>>> >>>>> > > >>>>> small
>>> >>>>> > > >>>>> >> thing,
>>> >>>>> > > >>>>> >> > > you
>>> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki
>>> page
>>> >>>>> instead
>>> >>>>> > > of
>>> >>>>> > > >>>>> >> linking
>>> >>>>> > > >>>>> >> > to
>>> >>>>> > > >>>>> >> > > a
>>> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people
>>> might
>>> >>>>> not be
>>> >>>>> > > >>>>> able to
>>> >>>>> > > >>>>> >> > access
>>> >>>>> > > >>>>> >> > > > the
>>> >>>>> > > >>>>> >> > > > > google doc.
>>> >>>>> > > >>>>> >> > > > >
>>> >>>>> > > >>>>> >> > > > > Best regards,
>>> >>>>> > > >>>>> >> > > > > Jing
>>> >>>>> > > >>>>> >> > > > >
>>> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>> >>>>> > > yegangapp@gmail.com
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>> >> wrote:
>>> >>>>> > > >>>>> >> > > > >
>>> >>>>> > > >>>>> >> > > > >> Hi,
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>> >>>>> > > >>>>> >> > > > >> <
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > >
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>>
>>> >>>>> > >
>>> >>>>> >
>>> >>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>> >>>>> > > >>>>> >> > > > >> >
>>> >>>>> > > >>>>> >> > > > >> at Confluent to extract
>>> BaseCoordinatorContext
>>> >>>>> from
>>> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for
>>> other
>>> >>>>> > > >>>>> coordinators E.g.
>>> >>>>> > > >>>>> >> in
>>> >>>>> > > >>>>> >> > > the
>>> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>> >>>>> > > >>>>> >> > > > >> <
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > >
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>>
>>> >>>>> > >
>>> >>>>> >
>>> >>>>>
>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>> >>>>> > > >>>>> >> > > > >> >
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
>>> >>>>> > > >>>>> >> > > > >> Thanks
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > > >> Gang
>>> >>>>> > > >>>>> >> > > > >>
>>> >>>>> > > >>>>> >> > > > >
>>> >>>>> > > >>>>> >> > > >
>>> >>>>> > > >>>>> >> > >
>>> >>>>> > > >>>>> >> >
>>> >>>>> > > >>>>> >>
>>> >>>>> > > >>>>> >
>>> >>>>> > > >>>>>
>>> >>>>> > > >>>>
>>> >>>>> > >
>>> >>>>> >
>>> >>>>>
>>> >>>>
>>>
>>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Steven Wu <st...@gmail.com>.
Jark,

We were planning to discard the proposal due to some valid concerns raised
in the thread. Also, this proposal itself didn't really save too much code
duplication (maybe 100 lines or so).

I also thought that the shuffle operator for DataStream can be useful for
other connectors too. The shuffling part (based on traffic statistics) can
be generic for other connectors. There will be some small integration part
unique to Iceberg, which can stay in Iceberg. If we go with this new
direction, we would need a new FLIP.

Thanks,
Steven



On Mon, Jan 16, 2023 at 12:30 AM Jark Wu <im...@gmail.com> wrote:

> What's the status and conclusion of this discussion?
>
> I have seen the value of exposing OperatorCoordinator because of the
> powerful RPC calls,
> some projects are already using it, such as Hudi[1]. But I agree this is a
> large topic and
> requires another FLIP.
>
> I am also concerned about extracting a Public base class without
> implementations, and
> clear usage is easy to break in the future. However, I think the shuffling
> operator can be a
> generic component used by other connectors and DataStream jobs.
>
> Have you considered contributing the ShuffleOperator to the Flink main
> repository as a
> part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
> extract the common
> part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
> single repository
>  as an internal implementation.
>
>
> Best,
> Jark
>
> [1]:
> https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
>
> On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pn...@apache.org> wrote:
>
>> Ohhh, I was confused. I thought that the proposal is to make
>> `CoordinatorContextBase` part of the public API.
>>
>> However, I'm also against extracting `CoordinatorContextBase` as an
>> `@Internal` class as well.
>>
>> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
>> CoordinatedOperatorFactory would be already quite bad, but at least this
>> is
>> a relatively stable internal API. Using `@Internal`
>> `@CoordinatorContextBase`, and refactoring out this base class just for
>> the
>> sake of re-using it in a connector is IMO even worse.
>> 2. Double so if they are in a separate repository (as the iceberg
>> connector
>> will be/is, right?). There would be no way to prevent breaking changes
>> between repositories.
>>
>> If that's only intended as the stop-gap solution until we properly expose
>> coordinators, the lesser evil would be IMO to copy/paste/modify
>> SourceCoordinatorContext to the flink-connector-iceberg repository.
>>
>> Best,
>> Piotrek
>>
>> czw., 3 lis 2022 o 12:51 Maximilian Michels <mx...@apache.org> napisał(a):
>>
>> > +1 If we wanted to expose the OperatorCoordinator API, we should provide
>> > an adequate interface. The FLIP partially addresses this by trying to
>> > factor out RPC code which other coordinators might make use of, but
>> there
>> > is additional design necessary to realize a public operator API.
>> >
>> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
>> > think they make sense in the context of an Iceberg ShuffleCoordinator in
>> > Flink. If we were to add such a new coordinator, feel free to make the
>> > proposed code refactoring as part of a pull request. A FLIP isn't
>> strictly
>> > necessary here because this is a purely internal change which does not
>> > alter public APIs, nor does it alter the internal architecture, apart
>> from
>> > reusing a bit of existing code. I'm sorry if we consumed some of your
>> time
>> > revising the document but I think we had a healthy discussion here. And
>> > we're definitely looking forward to seeing some of these code changes!
>> >
>> > -Max
>> >
>> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> Sorry for the delay, but I've given more thoughts into this. First I
>> >> share the same thoughts as Maximilian, that this FLIP is incomplete.
>> As I
>> >> understand it, you are trying to hack existing code to expose small
>> bits of
>> >> internal functionalities as part of the public API without solving
>> many of
>> >> the underlying issues.
>> >>
>> >> For example, what's the point of exposing `CoordinatorContextBase` as a
>> >> public API if users can not use it? After all, the
>> `OperatorCoordinator`
>> >> and `CoordinatedOperatorFactory` would remain internal. At the same
>> time,
>> >> this FLIP would officially force us to support and maintain this
>> >> CoordinatorContextBase, while I have strong feelings that we don't
>> want to
>> >> do that in the long term. I think we would need to take a big step
>> back and
>> >> first discuss how we would like to expose the coordinators and agree
>> how to
>> >> deal with the issues.
>> >>
>> >> First big issue that I see is that I would feel very worried exposing
>> >> coordinator API without at least designing/planning how to deal with
>> >> checkpointing their state. Without that, I'm afraid we might end up in
>> a
>> >> situation where we need to break the API in order to properly support
>> >> stateful coordinators. And at the moment I don't see a good and easy
>> >> solution to this problem.
>> >>
>> >> Second issue is the shape of the exposed public API. Exposing
>> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a
>> bad
>> >> design, that would expose way too many things to the users, making
>> future
>> >> development more complicated for us and making implementation of those
>> >> interfaces by the user unnecessary difficult. I see this as a similar
>> issue
>> >> as the low level `StreamOperator` API vs the higher level
>> >> `org.apache.flink.api.common.functions.Function` API. (instead of
>> exposing
>> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
>> >> `ProcessFunction` to expose all of the remaining functionalities in a
>> >> user-friendly way). In the context of the coordinators, I would say
>> that we
>> >> should expose as the public API not the `OperatorCoordinator`, but for
>> >> example some kind of an `EventProcessFunction` that would have a simple
>> >> interface like:
>> >> ```
>> >> interface EventProcessFunction {
>> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
>> >> eventDispatcher);
>> >> }
>> >> ```
>> >> + maybe some features like processing time timers/mailbox style async
>> >> actions.
>> >> (or maybe that could have been just a regular `ProcessFunction` but
>> with
>> >> `OperatorEvent` with `int subtask` as input/output).
>> >>
>> >> Best,
>> >> Piotrek
>> >>
>> >> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
>> >>
>> >>> Hi Max and Qingsheng,
>> >>>
>> >>> Thanks for the feedback. The initial motivation to propose this is to
>> >>> reduce the duplicated code since ShuffleCoordinator would need similar
>> >>> communication logic as SourceCoordinator to talk with operators. I
>> >>> understand your concern that OperatorCoordinator is an internal class
>> and
>> >>> except SourceCoordinator for now no other uses this.
>> >>> How about let's do it like what Qingsheng said? I can go ahead with
>> the
>> >>> ShufflingCoordinator implementation without the extraction. Then we
>> have
>> >>> intuitive sense of how many codes are copied and can be reused. If we
>> feel
>> >>> that there is still a need to extract, we can revisit the discussion.
>> >>>
>> >>> Thanks
>> >>> Gang
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
>> wrote:
>> >>>
>> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same
>> concern
>> >>>> with Piotr and Maximilian.
>> >>>>
>> >>>> OperatorCoordinator is marked as @Internal intentionally considering
>> >>>> some existing issues, like consistency between non-source operator
>> and
>> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose a
>> public
>> >>>> context to developers but have the OperatorCoordinator as an
>> internal API.
>> >>>> If we finally close all issues and decide to expose the operator
>> >>>> coordinator API, it would be a better chance to include the base
>> context as
>> >>>> a part of it.
>> >>>>
>> >>>> Best,
>> >>>> Qingsheng
>> >>>>
>> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks Steven! My confusion stemmed from the lack of context in the
>> >>>>> FLIP.
>> >>>>> The first version did not lay out how the refactoring would be used
>> >>>>> down
>> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator
>> API
>> >>>>> is a
>> >>>>> non-public API and before reading the code, I wasn't even aware how
>> >>>>> exactly
>> >>>>> it worked and whether it would be available to regular operators (it
>> >>>>> was
>> >>>>> originally intended for sources only).
>> >>>>>
>> >>>>> I might seem pedantic here but I believe the purpose of a FLIP
>> should
>> >>>>> be to
>> >>>>> describe the *why* behind the changes, not only the changes itself.
>> A
>> >>>>> FLIP
>> >>>>> is not a formality but is a tool to communicate and discuss
>> changes. I
>> >>>>> think we still haven't laid out the exact reasons why we are
>> factoring
>> >>>>> out
>> >>>>> the base. As far as I understand now, we need the base class to deal
>> >>>>> with
>> >>>>> concurrent updates in the custom Coordinator from the runtime
>> >>>>> (sub)tasks.
>> >>>>> Effectively, we are enforcing an actor model for the processing of
>> the
>> >>>>> incoming messages such that the OperatorCoordinator can cleanly
>> update
>> >>>>> its
>> >>>>> state. However, if there are no actual implementations that make use
>> >>>>> of the
>> >>>>> refactoring in Flink itself, I wonder if it would make sense to copy
>> >>>>> this
>> >>>>> code to the downstream implementation, e.g. the ShuffleCoordinator.
>> As
>> >>>>> soon
>> >>>>> as it is part of Flink, we could of course try to consolidate this
>> >>>>> code.
>> >>>>>
>> >>>>> Considering the *how* of this, there appear to be both methods from
>> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>> >>>>> SourceCoordinatorContext
>> >>>>> listed in the FLIP, as well as methods which do not appear anywhere
>> in
>> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
>> sendEventToOperator.
>> >>>>> It
>> >>>>> appears that some of this has been extracted from a downstream
>> >>>>> implementation. It would be great to adjust this, such that it
>> >>>>> reflects the
>> >>>>> status quo in Flink.
>> >>>>>
>> >>>>> -Max
>> >>>>>
>> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>> > Max,
>> >>>>> >
>> >>>>> > Thanks a lot for the comments. We should clarify that the shuffle
>> >>>>> > operator/coordinator is not really part of the Flink sink
>> >>>>> > function/operator. shuffle operator is a custom operator that can
>> be
>> >>>>> > inserted right before the Iceberg writer operator. Shuffle
>> operator
>> >>>>> > calculates the traffic statistics and performs a custom
>> >>>>> partition/shuffle
>> >>>>> > (DataStream#partitionCustom) to cluster the data right before they
>> >>>>> get to
>> >>>>> > the Iceberg writer operator.
>> >>>>> >
>> >>>>> > We are not proposing to introduce a sink coordinator for the sink
>> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>> >>>>> > facilitate the communication btw shuffle subtasks and coordinator
>> for
>> >>>>> > traffic statistics aggregation. The communication part is already
>> >>>>> > implemented by SourceCoordinatorContext.
>> >>>>> >
>> >>>>> > Here are some details about the communication needs.
>> >>>>> > - subtasks periodically calculate local statistics and send to the
>> >>>>> > coordinator for global aggregation
>> >>>>> > - the coordinator sends the globally aggregated statistics to the
>> >>>>> subtasks
>> >>>>> > - subtasks use the globally aggregated statistics to guide the
>> >>>>> > partition/shuffle decision
>> >>>>> >
>> >>>>> > Regards,
>> >>>>> > Steven
>> >>>>> >
>> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <
>> mxm@apache.org>
>> >>>>> wrote:
>> >>>>> >
>> >>>>> > > Hi Gang,
>> >>>>> > >
>> >>>>> > > Looks much better! I've actually gone through the
>> >>>>> OperatorCoordinator
>> >>>>> > code.
>> >>>>> > > It turns out, any operator already has an OperatorCoordinator
>> >>>>> assigned.
>> >>>>> > > Also, any operator can add custom coordinator code. So it looks
>> >>>>> like you
>> >>>>> > > won't have to implement any additional runtime logic to add a
>> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
>> >>>>> specifically need
>> >>>>> > to
>> >>>>> > > refactor the SourceCoordinatorContext? You could simply add your
>> >>>>> own
>> >>>>> > > coordinator code. I'm not sure the sink requirements map to the
>> >>>>> source
>> >>>>> > > interface so closely that you can reuse the same logic.
>> >>>>> > >
>> >>>>> > > If you can refactor SourceCoordinatorContext in a way that makes
>> >>>>> it fit
>> >>>>> > > your use case, I have nothing to object here. By the way,
>> another
>> >>>>> example
>> >>>>> > > of an existing OperatorCoordinator is
>> >>>>> CollectSinkOperatorCoordinator
>> >>>>> > which
>> >>>>> > > is quite trivial but it might be worth evaluating whether you
>> need
>> >>>>> the
>> >>>>> > full
>> >>>>> > > power of SourceCoordinatorContext which is why I wanted to get
>> more
>> >>>>> > > context.
>> >>>>> > >
>> >>>>> > > -Max
>> >>>>> > >
>> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
>> >>>>> wrote:
>> >>>>> > >
>> >>>>> > > > Hi Max,
>> >>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
>> >>>>> sink is
>> >>>>> > not
>> >>>>> > > > the main body of the proposal, I add another appendix part
>> just
>> >>>>> now
>> >>>>> > with
>> >>>>> > > > more details about how to use CoordinatorContextBase and how
>> to
>> >>>>> define
>> >>>>> > > > ShufflingCoordinator.
>> >>>>> > > >
>> >>>>> > > > Let me know if that cannot solve your concern.
>> >>>>> > > >
>> >>>>> > > > Thanks
>> >>>>> > > > Gang
>> >>>>> > > >
>> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>> >>>>> mxm@apache.org>
>> >>>>> > > wrote:
>> >>>>> > > >
>> >>>>> > > >> Hey Gang,
>> >>>>> > > >>
>> >>>>> > > >> What I'm looking for here is a complete picture of why the
>> >>>>> change is
>> >>>>> > > >> necessary and what the next steps are. Ultimately,
>> refactoring
>> >>>>> any
>> >>>>> > code
>> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
>> >>>>> code such
>> >>>>> > > that
>> >>>>> > > >> we can add a SinkCoordinator, similar to the
>> SourceCoordinator.
>> >>>>> The
>> >>>>> > FLIP
>> >>>>> > > >> should address the next steps, i.e. how you plan to add the
>> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
>> >>>>> have to
>> >>>>> > be
>> >>>>> > > in
>> >>>>> > > >> great detail but without this information, I don't think the
>> >>>>> FLIP is
>> >>>>> > > >> complete.
>> >>>>> > > >>
>> >>>>> > > >> This feature should be generic enough to be usable by other
>> >>>>> sinks than
>> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>> >>>>> > > implementation
>> >>>>> > > >> which may be outlined in a separate FLIP.
>> >>>>> > > >>
>> >>>>> > > >> Unless there is a good reason, normal operators should not
>> >>>>> support the
>> >>>>> > > >> coordinator functionality. At least I'm not convinced it
>> would
>> >>>>> play
>> >>>>> > well
>> >>>>> > > >> with Flink's execution model. But I see how it is required
>> for
>> >>>>> sources
>> >>>>> > > and
>> >>>>> > > >> sinks.
>> >>>>> > > >>
>> >>>>> > > >> -Max
>> >>>>> > > >>
>> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <yegangapp@gmail.com
>> >
>> >>>>> wrote:
>> >>>>> > > >>
>> >>>>> > > >>> Hi Max,
>> >>>>> > > >>>
>> >>>>> > > >>> Thanks for reviewing.
>> >>>>> > > >>>
>> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting
>> RPC
>> >>>>> calls
>> >>>>> > > >>> between the task and the job manager for communications and
>> >>>>> won't
>> >>>>> > touch
>> >>>>> > > >>> watermark checkpoint.
>> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>> >>>>> subtasks, then
>> >>>>> > > it
>> >>>>> > > >>> can define context without extending from the
>> >>>>> > CoordinatorContextBase(or
>> >>>>> > > >>> find another class name to limit the scope).
>> >>>>> > > >>>
>> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we
>> will
>> >>>>> only do
>> >>>>> > > >>> context extraction. The shuffling coordinator and operator
>> >>>>> > > >>> <
>> >>>>> > >
>> >>>>> >
>> >>>>>
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>> >>>>> > > >
>> >>>>> > > >>> which will use the context will come with a separate
>> proposal,
>> >>>>> thus
>> >>>>> > we
>> >>>>> > > try
>> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
>> >>>>> little bit
>> >>>>> > > more
>> >>>>> > > >>> about how to use the coordinator context in Flip 264 if you
>> >>>>> think
>> >>>>> > that
>> >>>>> > > will
>> >>>>> > > >>> be helpful.
>> >>>>> > > >>>
>> >>>>> > > >>> Thanks!
>> >>>>> > > >>> Gang
>> >>>>> > > >>>
>> >>>>> > > >>>
>> >>>>> > > >>>
>> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>> >>>>> mxm@apache.org>
>> >>>>> > > >>> wrote:
>> >>>>> > > >>>
>> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
>> >>>>> bigger
>> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27,
>> was
>> >>>>> > > specifically
>> >>>>> > > >>>> added to synchronize the global watermark and to assign
>> splits
>> >>>>> > > dynamically.
>> >>>>> > > >>>> However, it practically allows arbitrary RPC calls between
>> >>>>> the task
>> >>>>> > > and the
>> >>>>> > > >>>> job manager. I understand that there is concern that such a
>> >>>>> powerful
>> >>>>> > > >>>> mechanism should not be available to all operators.
>> >>>>> Nevertheless, I
>> >>>>> > > see the
>> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
>> >>>>> limiting
>> >>>>> > > this
>> >>>>> > > >>>> feature to sinks (and sources) only.
>> >>>>> > > >>>>
>> >>>>> > > >>>> I'm wondering whether extracting the
>> SourceCoordinatorContext
>> >>>>> is
>> >>>>> > > >>>> enough to achieve what you want. There will be additional
>> work
>> >>>>> > > necessary,
>> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to
>> SourceCoordinator
>> >>>>> which
>> >>>>> > > handles
>> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
>> good
>> >>>>> to
>> >>>>> > > outline
>> >>>>> > > >>>> this in the FLIP.
>> >>>>> > > >>>>
>> >>>>> > > >>>> -Max
>> >>>>> > > >>>>
>> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>> >>>>> stevenz3wu@gmail.com>
>> >>>>> > > wrote:
>> >>>>> > > >>>>
>> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>> >>>>> > > >>>>>
>> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
>> >>>>> > > FLINK-27405
>> >>>>> > > >>>>> [1],
>> >>>>> > > >>>>> Avid pointed out some implications regarding
>> checkpointing.
>> >>>>> In this
>> >>>>> > > >>>>> small
>> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing
>> logic,
>> >>>>> we
>> >>>>> > mainly
>> >>>>> > > >>>>> need
>> >>>>> > > >>>>> the coordinator context functionality to facilitate the
>> >>>>> > communication
>> >>>>> > > >>>>> between coordinator and subtasks.
>> >>>>> > > >>>>>
>> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>> >>>>> > > >>>>>
>> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>> >>>>> stevenz3wu@gmail.com>
>> >>>>> > > >>>>> wrote:
>> >>>>> > > >>>>>
>> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
>> >>>>> `CoordinatorContextBase`
>> >>>>> > > is a
>> >>>>> > > >>>>> > better name considering Flink code convention.
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In
>> the
>> >>>>> jira,
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>> >>>>> > ruanhang1993@gmail.com
>> >>>>> > > >
>> >>>>> > > >>>>> wrote:
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> >> Hi,
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
>> >>>>> > SourceCoordinatorContext.
>> >>>>> > > >>>>> >> But I prefer to use the name
>> >>>>> `OperatorCoordinatorContextBase` or
>> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>> >>>>> `SourceReaderBase`.
>> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
>> will
>> >>>>> occur
>> >>>>> > > when
>> >>>>> > > >>>>> >> connectors start to use it.
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>> >> Best,
>> >>>>> > > >>>>> >> Hang
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五
>> 22:31写道:
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>> >> > Piotr,
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>> >>>>> @Iinternal
>> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>> >>>>> > > >>>>> BaseCoordinatorContext.
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> > The motivation is that other operators can leverage
>> the
>> >>>>> > > >>>>> communication
>> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>> >>>>> subtasks. For
>> >>>>> > > >>>>> example,
>> >>>>> > > >>>>> >> in
>> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
>> >>>>> Iceberg sink)
>> >>>>> > > can
>> >>>>> > > >>>>> >> leverage
>> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>> >>>>> > > >>>>> >> > * subtasks calculate local statistics and
>> periodically
>> >>>>> send
>> >>>>> > them
>> >>>>> > > >>>>> to the
>> >>>>> > > >>>>> >> > coordinator for global aggregation.
>> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
>> aggregated
>> >>>>> > > >>>>> statistics to
>> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>> >>>>> decision
>> >>>>> > > >>>>> (selecting
>> >>>>> > > >>>>> >> > downstream channels).
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> > Thanks,
>> >>>>> > > >>>>> >> > Steven
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>> >>>>> > > >>>>> pnowojski@apache.org>
>> >>>>> > > >>>>> >> > wrote:
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >> > > Hi,
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have
>> >>>>> in mind?
>> >>>>> > > >>>>> From the
>> >>>>> > > >>>>> >> > > context I would understand that the newly extracted
>> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>> >>>>> > > >>>>> `@PublicEvolving`
>> >>>>> > > >>>>> >> or
>> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>> >>>>> keeping
>> >>>>> > > >>>>> `@Internal`
>> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>> >>>>> could have
>> >>>>> > > >>>>> been
>> >>>>> > > >>>>> >> removed
>> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
>> that,
>> >>>>> it
>> >>>>> > > sounds
>> >>>>> > > >>>>> to me
>> >>>>> > > >>>>> >> > like
>> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
>> >>>>> first
>> >>>>> > > glance
>> >>>>> > > >>>>> and
>> >>>>> > > >>>>> >> you
>> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>> >>>>> concept to
>> >>>>> > > the
>> >>>>> > > >>>>> public
>> >>>>> > > >>>>> >> > API?
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and
>> it
>> >>>>> was a
>> >>>>> > bit
>> >>>>> > > >>>>> of a
>> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
>> those
>> >>>>> > reasons
>> >>>>> > > >>>>> however.
>> >>>>> > > >>>>> >> > Only
>> >>>>> > > >>>>> >> > > now, I've just heard that there are for example
>> some
>> >>>>> > problems
>> >>>>> > > >>>>> with
>> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>> >>>>> > > coordinators.
>> >>>>> > > >>>>> Maybe
>> >>>>> > > >>>>> >> > > someone else could shed some light on this?
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
>> exposing
>> >>>>> > > operator
>> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind that,
>> >>>>> but it
>> >>>>> > is
>> >>>>> > > a
>> >>>>> > > >>>>> more
>> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort than
>> it
>> >>>>> seems
>> >>>>> > at
>> >>>>> > > >>>>> the
>> >>>>> > > >>>>> >> first
>> >>>>> > > >>>>> >> > > glance.
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > Best,
>> >>>>> > > >>>>> >> > > Piotrek
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>> >>>>> stevenz3wu@gmail.com>
>> >>>>> > > >>>>> napisał(a):
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
>> >>>>> google doc
>> >>>>> > is
>> >>>>> > > >>>>> not for
>> >>>>> > > >>>>> >> > this
>> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page.
>> >>>>> The
>> >>>>> > linked
>> >>>>> > > >>>>> google
>> >>>>> > > >>>>> >> doc
>> >>>>> > > >>>>> >> > > is
>> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>> >>>>> Iceberg
>> >>>>> > sink,
>> >>>>> > > >>>>> which
>> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>> >>>>> > coordinator
>> >>>>> > > >>>>> can
>> >>>>> > > >>>>> >> > leverage
>> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
>> code
>> >>>>> > > >>>>> duplication.
>> >>>>> > > >>>>> >> > > >
>> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>> >>>>> > jing@ververica.com>
>> >>>>> > > >>>>> wrote:
>> >>>>> > > >>>>> >> > > >
>> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>> >>>>> good! One
>> >>>>> > > >>>>> small
>> >>>>> > > >>>>> >> thing,
>> >>>>> > > >>>>> >> > > you
>> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki
>> page
>> >>>>> instead
>> >>>>> > > of
>> >>>>> > > >>>>> >> linking
>> >>>>> > > >>>>> >> > to
>> >>>>> > > >>>>> >> > > a
>> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people
>> might
>> >>>>> not be
>> >>>>> > > >>>>> able to
>> >>>>> > > >>>>> >> > access
>> >>>>> > > >>>>> >> > > > the
>> >>>>> > > >>>>> >> > > > > google doc.
>> >>>>> > > >>>>> >> > > > >
>> >>>>> > > >>>>> >> > > > > Best regards,
>> >>>>> > > >>>>> >> > > > > Jing
>> >>>>> > > >>>>> >> > > > >
>> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>> >>>>> > > yegangapp@gmail.com
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>> >> wrote:
>> >>>>> > > >>>>> >> > > > >
>> >>>>> > > >>>>> >> > > > >> Hi,
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>> >>>>> > > >>>>> >> > > > >> <
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > >
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>>
>> >>>>> > >
>> >>>>> >
>> >>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>> >>>>> > > >>>>> >> > > > >> >
>> >>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext
>> >>>>> from
>> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
>> >>>>> > > >>>>> coordinators E.g.
>> >>>>> > > >>>>> >> in
>> >>>>> > > >>>>> >> > > the
>> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>> >>>>> > > >>>>> >> > > > >> <
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > >
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>>
>> >>>>> > >
>> >>>>> >
>> >>>>>
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>> >>>>> > > >>>>> >> > > > >> >
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
>> >>>>> > > >>>>> >> > > > >> Thanks
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > > >> Gang
>> >>>>> > > >>>>> >> > > > >>
>> >>>>> > > >>>>> >> > > > >
>> >>>>> > > >>>>> >> > > >
>> >>>>> > > >>>>> >> > >
>> >>>>> > > >>>>> >> >
>> >>>>> > > >>>>> >>
>> >>>>> > > >>>>> >
>> >>>>> > > >>>>>
>> >>>>> > > >>>>
>> >>>>> > >
>> >>>>> >
>> >>>>>
>> >>>>
>>
>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Jark Wu <im...@gmail.com>.
What's the status and conclusion of this discussion?

I have seen the value of exposing OperatorCoordinator because of the
powerful RPC calls,
some projects are already using it, such as Hudi[1]. But I agree this is a
large topic and
requires another FLIP.

I am also concerned about extracting a Public base class without
implementations, and
clear usage is easy to break in the future. However, I think the shuffling
operator can be a
generic component used by other connectors and DataStream jobs.

Have you considered contributing the ShuffleOperator to the Flink main
repository as a
part of DataStream API (e.g., DataStream#dynamicShuffle)? It's easy to
extract the common
part between SourceCoordinatorContext and ShuffleCoordinatorContext in a
single repository
 as an internal implementation.


Best,
Jark

[1]:
https://github.com/apache/hudi/blob/a80bb4f717ad8a89770176a1238c4b08874044e8/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java

On Thu, 3 Nov 2022 at 22:36, Piotr Nowojski <pn...@apache.org> wrote:

> Ohhh, I was confused. I thought that the proposal is to make
> `CoordinatorContextBase` part of the public API.
>
> However, I'm also against extracting `CoordinatorContextBase` as an
> `@Internal` class as well.
>
> 1. Connectors shouldn't reuse internal classes. Using `@Internal`
> CoordinatedOperatorFactory would be already quite bad, but at least this is
> a relatively stable internal API. Using `@Internal`
> `@CoordinatorContextBase`, and refactoring out this base class just for the
> sake of re-using it in a connector is IMO even worse.
> 2. Double so if they are in a separate repository (as the iceberg connector
> will be/is, right?). There would be no way to prevent breaking changes
> between repositories.
>
> If that's only intended as the stop-gap solution until we properly expose
> coordinators, the lesser evil would be IMO to copy/paste/modify
> SourceCoordinatorContext to the flink-connector-iceberg repository.
>
> Best,
> Piotrek
>
> czw., 3 lis 2022 o 12:51 Maximilian Michels <mx...@apache.org> napisał(a):
>
> > +1 If we wanted to expose the OperatorCoordinator API, we should provide
> > an adequate interface. The FLIP partially addresses this by trying to
> > factor out RPC code which other coordinators might make use of, but there
> > is additional design necessary to realize a public operator API.
> >
> > Just to be clear, I'm not opposed to any of the changes in the FLIP. I
> > think they make sense in the context of an Iceberg ShuffleCoordinator in
> > Flink. If we were to add such a new coordinator, feel free to make the
> > proposed code refactoring as part of a pull request. A FLIP isn't
> strictly
> > necessary here because this is a purely internal change which does not
> > alter public APIs, nor does it alter the internal architecture, apart
> from
> > reusing a bit of existing code. I'm sorry if we consumed some of your
> time
> > revising the document but I think we had a healthy discussion here. And
> > we're definitely looking forward to seeing some of these code changes!
> >
> > -Max
> >
> > On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org>
> > wrote:
> >
> >> Hi,
> >>
> >> Sorry for the delay, but I've given more thoughts into this. First I
> >> share the same thoughts as Maximilian, that this FLIP is incomplete. As
> I
> >> understand it, you are trying to hack existing code to expose small
> bits of
> >> internal functionalities as part of the public API without solving many
> of
> >> the underlying issues.
> >>
> >> For example, what's the point of exposing `CoordinatorContextBase` as a
> >> public API if users can not use it? After all, the `OperatorCoordinator`
> >> and `CoordinatedOperatorFactory` would remain internal. At the same
> time,
> >> this FLIP would officially force us to support and maintain this
> >> CoordinatorContextBase, while I have strong feelings that we don't want
> to
> >> do that in the long term. I think we would need to take a big step back
> and
> >> first discuss how we would like to expose the coordinators and agree
> how to
> >> deal with the issues.
> >>
> >> First big issue that I see is that I would feel very worried exposing
> >> coordinator API without at least designing/planning how to deal with
> >> checkpointing their state. Without that, I'm afraid we might end up in a
> >> situation where we need to break the API in order to properly support
> >> stateful coordinators. And at the moment I don't see a good and easy
> >> solution to this problem.
> >>
> >> Second issue is the shape of the exposed public API. Exposing
> >> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a
> bad
> >> design, that would expose way too many things to the users, making
> future
> >> development more complicated for us and making implementation of those
> >> interfaces by the user unnecessary difficult. I see this as a similar
> issue
> >> as the low level `StreamOperator` API vs the higher level
> >> `org.apache.flink.api.common.functions.Function` API. (instead of
> exposing
> >> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
> >> `ProcessFunction` to expose all of the remaining functionalities in a
> >> user-friendly way). In the context of the coordinators, I would say
> that we
> >> should expose as the public API not the `OperatorCoordinator`, but for
> >> example some kind of an `EventProcessFunction` that would have a simple
> >> interface like:
> >> ```
> >> interface EventProcessFunction {
> >>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
> >> eventDispatcher);
> >> }
> >> ```
> >> + maybe some features like processing time timers/mailbox style async
> >> actions.
> >> (or maybe that could have been just a regular `ProcessFunction` but with
> >> `OperatorEvent` with `int subtask` as input/output).
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
> >>
> >>> Hi Max and Qingsheng,
> >>>
> >>> Thanks for the feedback. The initial motivation to propose this is to
> >>> reduce the duplicated code since ShuffleCoordinator would need similar
> >>> communication logic as SourceCoordinator to talk with operators. I
> >>> understand your concern that OperatorCoordinator is an internal class
> and
> >>> except SourceCoordinator for now no other uses this.
> >>> How about let's do it like what Qingsheng said? I can go ahead with the
> >>> ShufflingCoordinator implementation without the extraction. Then we
> have
> >>> intuitive sense of how many codes are copied and can be reused. If we
> feel
> >>> that there is still a need to extract, we can revisit the discussion.
> >>>
> >>> Thanks
> >>> Gang
> >>>
> >>>
> >>>
> >>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org>
> wrote:
> >>>
> >>>> Thanks Gang and Steven for the FLIP. Actually I share the same concern
> >>>> with Piotr and Maximilian.
> >>>>
> >>>> OperatorCoordinator is marked as @Internal intentionally considering
> >>>> some existing issues, like consistency between non-source operator and
> >>>> coordinator on checkpoint. I'm wondering if it is useful to expose a
> public
> >>>> context to developers but have the OperatorCoordinator as an internal
> API.
> >>>> If we finally close all issues and decide to expose the operator
> >>>> coordinator API, it would be a better chance to include the base
> context as
> >>>> a part of it.
> >>>>
> >>>> Best,
> >>>> Qingsheng
> >>>>
> >>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Thanks Steven! My confusion stemmed from the lack of context in the
> >>>>> FLIP.
> >>>>> The first version did not lay out how the refactoring would be used
> >>>>> down
> >>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API
> >>>>> is a
> >>>>> non-public API and before reading the code, I wasn't even aware how
> >>>>> exactly
> >>>>> it worked and whether it would be available to regular operators (it
> >>>>> was
> >>>>> originally intended for sources only).
> >>>>>
> >>>>> I might seem pedantic here but I believe the purpose of a FLIP should
> >>>>> be to
> >>>>> describe the *why* behind the changes, not only the changes itself. A
> >>>>> FLIP
> >>>>> is not a formality but is a tool to communicate and discuss changes.
> I
> >>>>> think we still haven't laid out the exact reasons why we are
> factoring
> >>>>> out
> >>>>> the base. As far as I understand now, we need the base class to deal
> >>>>> with
> >>>>> concurrent updates in the custom Coordinator from the runtime
> >>>>> (sub)tasks.
> >>>>> Effectively, we are enforcing an actor model for the processing of
> the
> >>>>> incoming messages such that the OperatorCoordinator can cleanly
> update
> >>>>> its
> >>>>> state. However, if there are no actual implementations that make use
> >>>>> of the
> >>>>> refactoring in Flink itself, I wonder if it would make sense to copy
> >>>>> this
> >>>>> code to the downstream implementation, e.g. the ShuffleCoordinator.
> As
> >>>>> soon
> >>>>> as it is part of Flink, we could of course try to consolidate this
> >>>>> code.
> >>>>>
> >>>>> Considering the *how* of this, there appear to be both methods from
> >>>>> SourceCoordinator (e.g. runInEventLoop) as well as
> >>>>> SourceCoordinatorContext
> >>>>> listed in the FLIP, as well as methods which do not appear anywhere
> in
> >>>>> Flink code, e.g. subTaskReady / subTaskNotReady /
> sendEventToOperator.
> >>>>> It
> >>>>> appears that some of this has been extracted from a downstream
> >>>>> implementation. It would be great to adjust this, such that it
> >>>>> reflects the
> >>>>> status quo in Flink.
> >>>>>
> >>>>> -Max
> >>>>>
> >>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>> > Max,
> >>>>> >
> >>>>> > Thanks a lot for the comments. We should clarify that the shuffle
> >>>>> > operator/coordinator is not really part of the Flink sink
> >>>>> > function/operator. shuffle operator is a custom operator that can
> be
> >>>>> > inserted right before the Iceberg writer operator. Shuffle operator
> >>>>> > calculates the traffic statistics and performs a custom
> >>>>> partition/shuffle
> >>>>> > (DataStream#partitionCustom) to cluster the data right before they
> >>>>> get to
> >>>>> > the Iceberg writer operator.
> >>>>> >
> >>>>> > We are not proposing to introduce a sink coordinator for the sink
> >>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
> >>>>> > facilitate the communication btw shuffle subtasks and coordinator
> for
> >>>>> > traffic statistics aggregation. The communication part is already
> >>>>> > implemented by SourceCoordinatorContext.
> >>>>> >
> >>>>> > Here are some details about the communication needs.
> >>>>> > - subtasks periodically calculate local statistics and send to the
> >>>>> > coordinator for global aggregation
> >>>>> > - the coordinator sends the globally aggregated statistics to the
> >>>>> subtasks
> >>>>> > - subtasks use the globally aggregated statistics to guide the
> >>>>> > partition/shuffle decision
> >>>>> >
> >>>>> > Regards,
> >>>>> > Steven
> >>>>> >
> >>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mxm@apache.org
> >
> >>>>> wrote:
> >>>>> >
> >>>>> > > Hi Gang,
> >>>>> > >
> >>>>> > > Looks much better! I've actually gone through the
> >>>>> OperatorCoordinator
> >>>>> > code.
> >>>>> > > It turns out, any operator already has an OperatorCoordinator
> >>>>> assigned.
> >>>>> > > Also, any operator can add custom coordinator code. So it looks
> >>>>> like you
> >>>>> > > won't have to implement any additional runtime logic to add a
> >>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
> >>>>> specifically need
> >>>>> > to
> >>>>> > > refactor the SourceCoordinatorContext? You could simply add your
> >>>>> own
> >>>>> > > coordinator code. I'm not sure the sink requirements map to the
> >>>>> source
> >>>>> > > interface so closely that you can reuse the same logic.
> >>>>> > >
> >>>>> > > If you can refactor SourceCoordinatorContext in a way that makes
> >>>>> it fit
> >>>>> > > your use case, I have nothing to object here. By the way, another
> >>>>> example
> >>>>> > > of an existing OperatorCoordinator is
> >>>>> CollectSinkOperatorCoordinator
> >>>>> > which
> >>>>> > > is quite trivial but it might be worth evaluating whether you
> need
> >>>>> the
> >>>>> > full
> >>>>> > > power of SourceCoordinatorContext which is why I wanted to get
> more
> >>>>> > > context.
> >>>>> > >
> >>>>> > > -Max
> >>>>> > >
> >>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
> >>>>> wrote:
> >>>>> > >
> >>>>> > > > Hi Max,
> >>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
> >>>>> sink is
> >>>>> > not
> >>>>> > > > the main body of the proposal, I add another appendix part just
> >>>>> now
> >>>>> > with
> >>>>> > > > more details about how to use CoordinatorContextBase and how to
> >>>>> define
> >>>>> > > > ShufflingCoordinator.
> >>>>> > > >
> >>>>> > > > Let me know if that cannot solve your concern.
> >>>>> > > >
> >>>>> > > > Thanks
> >>>>> > > > Gang
> >>>>> > > >
> >>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
> >>>>> mxm@apache.org>
> >>>>> > > wrote:
> >>>>> > > >
> >>>>> > > >> Hey Gang,
> >>>>> > > >>
> >>>>> > > >> What I'm looking for here is a complete picture of why the
> >>>>> change is
> >>>>> > > >> necessary and what the next steps are. Ultimately, refactoring
> >>>>> any
> >>>>> > code
> >>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
> >>>>> code such
> >>>>> > > that
> >>>>> > > >> we can add a SinkCoordinator, similar to the
> SourceCoordinator.
> >>>>> The
> >>>>> > FLIP
> >>>>> > > >> should address the next steps, i.e. how you plan to add the
> >>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
> >>>>> have to
> >>>>> > be
> >>>>> > > in
> >>>>> > > >> great detail but without this information, I don't think the
> >>>>> FLIP is
> >>>>> > > >> complete.
> >>>>> > > >>
> >>>>> > > >> This feature should be generic enough to be usable by other
> >>>>> sinks than
> >>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
> >>>>> > > implementation
> >>>>> > > >> which may be outlined in a separate FLIP.
> >>>>> > > >>
> >>>>> > > >> Unless there is a good reason, normal operators should not
> >>>>> support the
> >>>>> > > >> coordinator functionality. At least I'm not convinced it would
> >>>>> play
> >>>>> > well
> >>>>> > > >> with Flink's execution model. But I see how it is required for
> >>>>> sources
> >>>>> > > and
> >>>>> > > >> sinks.
> >>>>> > > >>
> >>>>> > > >> -Max
> >>>>> > > >>
> >>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
> >>>>> wrote:
> >>>>> > > >>
> >>>>> > > >>> Hi Max,
> >>>>> > > >>>
> >>>>> > > >>> Thanks for reviewing.
> >>>>> > > >>>
> >>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
> >>>>> calls
> >>>>> > > >>> between the task and the job manager for communications and
> >>>>> won't
> >>>>> > touch
> >>>>> > > >>> watermark checkpoint.
> >>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
> >>>>> subtasks, then
> >>>>> > > it
> >>>>> > > >>> can define context without extending from the
> >>>>> > CoordinatorContextBase(or
> >>>>> > > >>> find another class name to limit the scope).
> >>>>> > > >>>
> >>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
> >>>>> only do
> >>>>> > > >>> context extraction. The shuffling coordinator and operator
> >>>>> > > >>> <
> >>>>> > >
> >>>>> >
> >>>>>
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> >>>>> > > >
> >>>>> > > >>> which will use the context will come with a separate
> proposal,
> >>>>> thus
> >>>>> > we
> >>>>> > > try
> >>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
> >>>>> little bit
> >>>>> > > more
> >>>>> > > >>> about how to use the coordinator context in Flip 264 if you
> >>>>> think
> >>>>> > that
> >>>>> > > will
> >>>>> > > >>> be helpful.
> >>>>> > > >>>
> >>>>> > > >>> Thanks!
> >>>>> > > >>> Gang
> >>>>> > > >>>
> >>>>> > > >>>
> >>>>> > > >>>
> >>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
> >>>>> mxm@apache.org>
> >>>>> > > >>> wrote:
> >>>>> > > >>>
> >>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
> >>>>> bigger
> >>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
> >>>>> > > specifically
> >>>>> > > >>>> added to synchronize the global watermark and to assign
> splits
> >>>>> > > dynamically.
> >>>>> > > >>>> However, it practically allows arbitrary RPC calls between
> >>>>> the task
> >>>>> > > and the
> >>>>> > > >>>> job manager. I understand that there is concern that such a
> >>>>> powerful
> >>>>> > > >>>> mechanism should not be available to all operators.
> >>>>> Nevertheless, I
> >>>>> > > see the
> >>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
> >>>>> limiting
> >>>>> > > this
> >>>>> > > >>>> feature to sinks (and sources) only.
> >>>>> > > >>>>
> >>>>> > > >>>> I'm wondering whether extracting the
> SourceCoordinatorContext
> >>>>> is
> >>>>> > > >>>> enough to achieve what you want. There will be additional
> work
> >>>>> > > necessary,
> >>>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
> >>>>> which
> >>>>> > > handles
> >>>>> > > >>>> the RPC calls and the checkpointing. I think it would be
> good
> >>>>> to
> >>>>> > > outline
> >>>>> > > >>>> this in the FLIP.
> >>>>> > > >>>>
> >>>>> > > >>>> -Max
> >>>>> > > >>>>
> >>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
> >>>>> stevenz3wu@gmail.com>
> >>>>> > > wrote:
> >>>>> > > >>>>
> >>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
> >>>>> > > >>>>>
> >>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
> >>>>> > > FLINK-27405
> >>>>> > > >>>>> [1],
> >>>>> > > >>>>> Avid pointed out some implications regarding checkpointing.
> >>>>> In this
> >>>>> > > >>>>> small
> >>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic,
> >>>>> we
> >>>>> > mainly
> >>>>> > > >>>>> need
> >>>>> > > >>>>> the coordinator context functionality to facilitate the
> >>>>> > communication
> >>>>> > > >>>>> between coordinator and subtasks.
> >>>>> > > >>>>>
> >>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> >>>>> > > >>>>>
> >>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
> >>>>> stevenz3wu@gmail.com>
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>>
> >>>>> > > >>>>> > Hang, appreciate your input. Agree that
> >>>>> `CoordinatorContextBase`
> >>>>> > > is a
> >>>>> > > >>>>> > better name considering Flink code convention.
> >>>>> > > >>>>> >
> >>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In
> the
> >>>>> jira,
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >
> >>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> >>>>> > ruanhang1993@gmail.com
> >>>>> > > >
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >> Hi,
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> IMP, I agree to extract a base class for
> >>>>> > SourceCoordinatorContext.
> >>>>> > > >>>>> >> But I prefer to use the name
> >>>>> `OperatorCoordinatorContextBase` or
> >>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
> >>>>> `SourceReaderBase`.
> >>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems
> will
> >>>>> occur
> >>>>> > > when
> >>>>> > > >>>>> >> connectors start to use it.
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> Best,
> >>>>> > > >>>>> >> Hang
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五
> 22:31写道:
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >> > Piotr,
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > The proposal is to extract the listed methods from
> >>>>> @Iinternal
> >>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> >>>>> > > >>>>> BaseCoordinatorContext.
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > The motivation is that other operators can leverage
> the
> >>>>> > > >>>>> communication
> >>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
> >>>>> subtasks. For
> >>>>> > > >>>>> example,
> >>>>> > > >>>>> >> in
> >>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
> >>>>> Iceberg sink)
> >>>>> > > can
> >>>>> > > >>>>> >> leverage
> >>>>> > > >>>>> >> > it for computing traffic distribution statistics.
> >>>>> > > >>>>> >> > * subtasks calculate local statistics and periodically
> >>>>> send
> >>>>> > them
> >>>>> > > >>>>> to the
> >>>>> > > >>>>> >> > coordinator for global aggregation.
> >>>>> > > >>>>> >> > * The coordinator can broadcast the globally
> aggregated
> >>>>> > > >>>>> statistics to
> >>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
> >>>>> decision
> >>>>> > > >>>>> (selecting
> >>>>> > > >>>>> >> > downstream channels).
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > Thanks,
> >>>>> > > >>>>> >> > Steven
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> >>>>> > > >>>>> pnowojski@apache.org>
> >>>>> > > >>>>> >> > wrote:
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >> > > Hi,
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have
> >>>>> in mind?
> >>>>> > > >>>>> From the
> >>>>> > > >>>>> >> > > context I would understand that the newly extracted
> >>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> >>>>> > > >>>>> `@PublicEvolving`
> >>>>> > > >>>>> >> or
> >>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
> >>>>> keeping
> >>>>> > > >>>>> `@Internal`
> >>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
> >>>>> could have
> >>>>> > > >>>>> been
> >>>>> > > >>>>> >> removed
> >>>>> > > >>>>> >> > > at any point of time in the future. Having said
> that,
> >>>>> it
> >>>>> > > sounds
> >>>>> > > >>>>> to me
> >>>>> > > >>>>> >> > like
> >>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
> >>>>> first
> >>>>> > > glance
> >>>>> > > >>>>> and
> >>>>> > > >>>>> >> you
> >>>>> > > >>>>> >> > > actually want to expose the operator coordinator
> >>>>> concept to
> >>>>> > > the
> >>>>> > > >>>>> public
> >>>>> > > >>>>> >> > API?
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it
> >>>>> was a
> >>>>> > bit
> >>>>> > > >>>>> of a
> >>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know
> those
> >>>>> > reasons
> >>>>> > > >>>>> however.
> >>>>> > > >>>>> >> > Only
> >>>>> > > >>>>> >> > > now, I've just heard that there are for example some
> >>>>> > problems
> >>>>> > > >>>>> with
> >>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
> >>>>> > > coordinators.
> >>>>> > > >>>>> Maybe
> >>>>> > > >>>>> >> > > someone else could shed some light on this?
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Conceptually I would be actually in favour of
> exposing
> >>>>> > > operator
> >>>>> > > >>>>> >> > > coordinators if there is a good reason behind that,
> >>>>> but it
> >>>>> > is
> >>>>> > > a
> >>>>> > > >>>>> more
> >>>>> > > >>>>> >> > > difficult topic and might be a larger effort than it
> >>>>> seems
> >>>>> > at
> >>>>> > > >>>>> the
> >>>>> > > >>>>> >> first
> >>>>> > > >>>>> >> > > glance.
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > Best,
> >>>>> > > >>>>> >> > > Piotrek
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
> >>>>> stevenz3wu@gmail.com>
> >>>>> > > >>>>> napisał(a):
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
> >>>>> google doc
> >>>>> > is
> >>>>> > > >>>>> not for
> >>>>> > > >>>>> >> > this
> >>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page.
> >>>>> The
> >>>>> > linked
> >>>>> > > >>>>> google
> >>>>> > > >>>>> >> doc
> >>>>> > > >>>>> >> > > is
> >>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
> >>>>> Iceberg
> >>>>> > sink,
> >>>>> > > >>>>> which
> >>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> >>>>> > coordinator
> >>>>> > > >>>>> can
> >>>>> > > >>>>> >> > leverage
> >>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid
> code
> >>>>> > > >>>>> duplication.
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> >>>>> > jing@ververica.com>
> >>>>> > > >>>>> wrote:
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
> >>>>> good! One
> >>>>> > > >>>>> small
> >>>>> > > >>>>> >> thing,
> >>>>> > > >>>>> >> > > you
> >>>>> > > >>>>> >> > > > > might want to write all content on the wiki page
> >>>>> instead
> >>>>> > > of
> >>>>> > > >>>>> >> linking
> >>>>> > > >>>>> >> > to
> >>>>> > > >>>>> >> > > a
> >>>>> > > >>>>> >> > > > > google doc. The reason is that some people might
> >>>>> not be
> >>>>> > > >>>>> able to
> >>>>> > > >>>>> >> > access
> >>>>> > > >>>>> >> > > > the
> >>>>> > > >>>>> >> > > > > google doc.
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > > Best regards,
> >>>>> > > >>>>> >> > > > > Jing
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> >>>>> > > yegangapp@gmail.com
> >>>>> > > >>>>> >
> >>>>> > > >>>>> >> wrote:
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > > >> Hi,
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> We submit the Flip proposal
> >>>>> > > >>>>> >> > > > >> <
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>>
> >>>>> > >
> >>>>> >
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> >>>>> > > >>>>> >> > > > >> >
> >>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext
> >>>>> from
> >>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> >>>>> > > >>>>> coordinators E.g.
> >>>>> > > >>>>> >> in
> >>>>> > > >>>>> >> > > the
> >>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> >>>>> > > >>>>> >> > > > >> <
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>>
> >>>>> > >
> >>>>> >
> >>>>>
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> >>>>> > > >>>>> >> > > > >> >
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> Could you help to take a look?
> >>>>> > > >>>>> >> > > > >> Thanks
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >> Gang
> >>>>> > > >>>>> >> > > > >>
> >>>>> > > >>>>> >> > > > >
> >>>>> > > >>>>> >> > > >
> >>>>> > > >>>>> >> > >
> >>>>> > > >>>>> >> >
> >>>>> > > >>>>> >>
> >>>>> > > >>>>> >
> >>>>> > > >>>>>
> >>>>> > > >>>>
> >>>>> > >
> >>>>> >
> >>>>>
> >>>>
>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Piotr Nowojski <pn...@apache.org>.
Ohhh, I was confused. I thought that the proposal is to make
`CoordinatorContextBase` part of the public API.

However, I'm also against extracting `CoordinatorContextBase` as an
`@Internal` class as well.

1. Connectors shouldn't reuse internal classes. Using `@Internal`
CoordinatedOperatorFactory would be already quite bad, but at least this is
a relatively stable internal API. Using `@Internal`
`@CoordinatorContextBase`, and refactoring out this base class just for the
sake of re-using it in a connector is IMO even worse.
2. Double so if they are in a separate repository (as the iceberg connector
will be/is, right?). There would be no way to prevent breaking changes
between repositories.

If that's only intended as the stop-gap solution until we properly expose
coordinators, the lesser evil would be IMO to copy/paste/modify
SourceCoordinatorContext to the flink-connector-iceberg repository.

Best,
Piotrek

czw., 3 lis 2022 o 12:51 Maximilian Michels <mx...@apache.org> napisał(a):

> +1 If we wanted to expose the OperatorCoordinator API, we should provide
> an adequate interface. The FLIP partially addresses this by trying to
> factor out RPC code which other coordinators might make use of, but there
> is additional design necessary to realize a public operator API.
>
> Just to be clear, I'm not opposed to any of the changes in the FLIP. I
> think they make sense in the context of an Iceberg ShuffleCoordinator in
> Flink. If we were to add such a new coordinator, feel free to make the
> proposed code refactoring as part of a pull request. A FLIP isn't strictly
> necessary here because this is a purely internal change which does not
> alter public APIs, nor does it alter the internal architecture, apart from
> reusing a bit of existing code. I'm sorry if we consumed some of your time
> revising the document but I think we had a healthy discussion here. And
> we're definitely looking forward to seeing some of these code changes!
>
> -Max
>
> On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi,
>>
>> Sorry for the delay, but I've given more thoughts into this. First I
>> share the same thoughts as Maximilian, that this FLIP is incomplete. As I
>> understand it, you are trying to hack existing code to expose small bits of
>> internal functionalities as part of the public API without solving many of
>> the underlying issues.
>>
>> For example, what's the point of exposing `CoordinatorContextBase` as a
>> public API if users can not use it? After all, the `OperatorCoordinator`
>> and `CoordinatedOperatorFactory` would remain internal. At the same time,
>> this FLIP would officially force us to support and maintain this
>> CoordinatorContextBase, while I have strong feelings that we don't want to
>> do that in the long term. I think we would need to take a big step back and
>> first discuss how we would like to expose the coordinators and agree how to
>> deal with the issues.
>>
>> First big issue that I see is that I would feel very worried exposing
>> coordinator API without at least designing/planning how to deal with
>> checkpointing their state. Without that, I'm afraid we might end up in a
>> situation where we need to break the API in order to properly support
>> stateful coordinators. And at the moment I don't see a good and easy
>> solution to this problem.
>>
>> Second issue is the shape of the exposed public API. Exposing
>> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a bad
>> design, that would expose way too many things to the users, making future
>> development more complicated for us and making implementation of those
>> interfaces by the user unnecessary difficult. I see this as a similar issue
>> as the low level `StreamOperator` API vs the higher level
>> `org.apache.flink.api.common.functions.Function` API. (instead of exposing
>> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
>> `ProcessFunction` to expose all of the remaining functionalities in a
>> user-friendly way). In the context of the coordinators, I would say that we
>> should expose as the public API not the `OperatorCoordinator`, but for
>> example some kind of an `EventProcessFunction` that would have a simple
>> interface like:
>> ```
>> interface EventProcessFunction {
>>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
>> eventDispatcher);
>> }
>> ```
>> + maybe some features like processing time timers/mailbox style async
>> actions.
>> (or maybe that could have been just a regular `ProcessFunction` but with
>> `OperatorEvent` with `int subtask` as input/output).
>>
>> Best,
>> Piotrek
>>
>> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
>>
>>> Hi Max and Qingsheng,
>>>
>>> Thanks for the feedback. The initial motivation to propose this is to
>>> reduce the duplicated code since ShuffleCoordinator would need similar
>>> communication logic as SourceCoordinator to talk with operators. I
>>> understand your concern that OperatorCoordinator is an internal class and
>>> except SourceCoordinator for now no other uses this.
>>> How about let's do it like what Qingsheng said? I can go ahead with the
>>> ShufflingCoordinator implementation without the extraction. Then we have
>>> intuitive sense of how many codes are copied and can be reused. If we feel
>>> that there is still a need to extract, we can revisit the discussion.
>>>
>>> Thanks
>>> Gang
>>>
>>>
>>>
>>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> wrote:
>>>
>>>> Thanks Gang and Steven for the FLIP. Actually I share the same concern
>>>> with Piotr and Maximilian.
>>>>
>>>> OperatorCoordinator is marked as @Internal intentionally considering
>>>> some existing issues, like consistency between non-source operator and
>>>> coordinator on checkpoint. I'm wondering if it is useful to expose a public
>>>> context to developers but have the OperatorCoordinator as an internal API.
>>>> If we finally close all issues and decide to expose the operator
>>>> coordinator API, it would be a better chance to include the base context as
>>>> a part of it.
>>>>
>>>> Best,
>>>> Qingsheng
>>>>
>>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks Steven! My confusion stemmed from the lack of context in the
>>>>> FLIP.
>>>>> The first version did not lay out how the refactoring would be used
>>>>> down
>>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API
>>>>> is a
>>>>> non-public API and before reading the code, I wasn't even aware how
>>>>> exactly
>>>>> it worked and whether it would be available to regular operators (it
>>>>> was
>>>>> originally intended for sources only).
>>>>>
>>>>> I might seem pedantic here but I believe the purpose of a FLIP should
>>>>> be to
>>>>> describe the *why* behind the changes, not only the changes itself. A
>>>>> FLIP
>>>>> is not a formality but is a tool to communicate and discuss changes. I
>>>>> think we still haven't laid out the exact reasons why we are factoring
>>>>> out
>>>>> the base. As far as I understand now, we need the base class to deal
>>>>> with
>>>>> concurrent updates in the custom Coordinator from the runtime
>>>>> (sub)tasks.
>>>>> Effectively, we are enforcing an actor model for the processing of the
>>>>> incoming messages such that the OperatorCoordinator can cleanly update
>>>>> its
>>>>> state. However, if there are no actual implementations that make use
>>>>> of the
>>>>> refactoring in Flink itself, I wonder if it would make sense to copy
>>>>> this
>>>>> code to the downstream implementation, e.g. the ShuffleCoordinator. As
>>>>> soon
>>>>> as it is part of Flink, we could of course try to consolidate this
>>>>> code.
>>>>>
>>>>> Considering the *how* of this, there appear to be both methods from
>>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>>>> SourceCoordinatorContext
>>>>> listed in the FLIP, as well as methods which do not appear anywhere in
>>>>> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator.
>>>>> It
>>>>> appears that some of this has been extracted from a downstream
>>>>> implementation. It would be great to adjust this, such that it
>>>>> reflects the
>>>>> status quo in Flink.
>>>>>
>>>>> -Max
>>>>>
>>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Max,
>>>>> >
>>>>> > Thanks a lot for the comments. We should clarify that the shuffle
>>>>> > operator/coordinator is not really part of the Flink sink
>>>>> > function/operator. shuffle operator is a custom operator that can be
>>>>> > inserted right before the Iceberg writer operator. Shuffle operator
>>>>> > calculates the traffic statistics and performs a custom
>>>>> partition/shuffle
>>>>> > (DataStream#partitionCustom) to cluster the data right before they
>>>>> get to
>>>>> > the Iceberg writer operator.
>>>>> >
>>>>> > We are not proposing to introduce a sink coordinator for the sink
>>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>>>> > facilitate the communication btw shuffle subtasks and coordinator for
>>>>> > traffic statistics aggregation. The communication part is already
>>>>> > implemented by SourceCoordinatorContext.
>>>>> >
>>>>> > Here are some details about the communication needs.
>>>>> > - subtasks periodically calculate local statistics and send to the
>>>>> > coordinator for global aggregation
>>>>> > - the coordinator sends the globally aggregated statistics to the
>>>>> subtasks
>>>>> > - subtasks use the globally aggregated statistics to guide the
>>>>> > partition/shuffle decision
>>>>> >
>>>>> > Regards,
>>>>> > Steven
>>>>> >
>>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>> >
>>>>> > > Hi Gang,
>>>>> > >
>>>>> > > Looks much better! I've actually gone through the
>>>>> OperatorCoordinator
>>>>> > code.
>>>>> > > It turns out, any operator already has an OperatorCoordinator
>>>>> assigned.
>>>>> > > Also, any operator can add custom coordinator code. So it looks
>>>>> like you
>>>>> > > won't have to implement any additional runtime logic to add a
>>>>> > > ShuffleCoordinator. However, I'm wondering, why do you
>>>>> specifically need
>>>>> > to
>>>>> > > refactor the SourceCoordinatorContext? You could simply add your
>>>>> own
>>>>> > > coordinator code. I'm not sure the sink requirements map to the
>>>>> source
>>>>> > > interface so closely that you can reuse the same logic.
>>>>> > >
>>>>> > > If you can refactor SourceCoordinatorContext in a way that makes
>>>>> it fit
>>>>> > > your use case, I have nothing to object here. By the way, another
>>>>> example
>>>>> > > of an existing OperatorCoordinator is
>>>>> CollectSinkOperatorCoordinator
>>>>> > which
>>>>> > > is quite trivial but it might be worth evaluating whether you need
>>>>> the
>>>>> > full
>>>>> > > power of SourceCoordinatorContext which is why I wanted to get more
>>>>> > > context.
>>>>> > >
>>>>> > > -Max
>>>>> > >
>>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
>>>>> wrote:
>>>>> > >
>>>>> > > > Hi Max,
>>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
>>>>> sink is
>>>>> > not
>>>>> > > > the main body of the proposal, I add another appendix part just
>>>>> now
>>>>> > with
>>>>> > > > more details about how to use CoordinatorContextBase and how to
>>>>> define
>>>>> > > > ShufflingCoordinator.
>>>>> > > >
>>>>> > > > Let me know if that cannot solve your concern.
>>>>> > > >
>>>>> > > > Thanks
>>>>> > > > Gang
>>>>> > > >
>>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>>>>> mxm@apache.org>
>>>>> > > wrote:
>>>>> > > >
>>>>> > > >> Hey Gang,
>>>>> > > >>
>>>>> > > >> What I'm looking for here is a complete picture of why the
>>>>> change is
>>>>> > > >> necessary and what the next steps are. Ultimately, refactoring
>>>>> any
>>>>> > code
>>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator
>>>>> code such
>>>>> > > that
>>>>> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator.
>>>>> The
>>>>> > FLIP
>>>>> > > >> should address the next steps, i.e. how you plan to add the
>>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
>>>>> have to
>>>>> > be
>>>>> > > in
>>>>> > > >> great detail but without this information, I don't think the
>>>>> FLIP is
>>>>> > > >> complete.
>>>>> > > >>
>>>>> > > >> This feature should be generic enough to be usable by other
>>>>> sinks than
>>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>>>> > > implementation
>>>>> > > >> which may be outlined in a separate FLIP.
>>>>> > > >>
>>>>> > > >> Unless there is a good reason, normal operators should not
>>>>> support the
>>>>> > > >> coordinator functionality. At least I'm not convinced it would
>>>>> play
>>>>> > well
>>>>> > > >> with Flink's execution model. But I see how it is required for
>>>>> sources
>>>>> > > and
>>>>> > > >> sinks.
>>>>> > > >>
>>>>> > > >> -Max
>>>>> > > >>
>>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
>>>>> wrote:
>>>>> > > >>
>>>>> > > >>> Hi Max,
>>>>> > > >>>
>>>>> > > >>> Thanks for reviewing.
>>>>> > > >>>
>>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
>>>>> calls
>>>>> > > >>> between the task and the job manager for communications and
>>>>> won't
>>>>> > touch
>>>>> > > >>> watermark checkpoint.
>>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>>>>> subtasks, then
>>>>> > > it
>>>>> > > >>> can define context without extending from the
>>>>> > CoordinatorContextBase(or
>>>>> > > >>> find another class name to limit the scope).
>>>>> > > >>>
>>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
>>>>> only do
>>>>> > > >>> context extraction. The shuffling coordinator and operator
>>>>> > > >>> <
>>>>> > >
>>>>> >
>>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>>> > > >
>>>>> > > >>> which will use the context will come with a separate proposal,
>>>>> thus
>>>>> > we
>>>>> > > try
>>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a
>>>>> little bit
>>>>> > > more
>>>>> > > >>> about how to use the coordinator context in Flip 264 if you
>>>>> think
>>>>> > that
>>>>> > > will
>>>>> > > >>> be helpful.
>>>>> > > >>>
>>>>> > > >>> Thanks!
>>>>> > > >>> Gang
>>>>> > > >>>
>>>>> > > >>>
>>>>> > > >>>
>>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>>>> mxm@apache.org>
>>>>> > > >>> wrote:
>>>>> > > >>>
>>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
>>>>> bigger
>>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
>>>>> > > specifically
>>>>> > > >>>> added to synchronize the global watermark and to assign splits
>>>>> > > dynamically.
>>>>> > > >>>> However, it practically allows arbitrary RPC calls between
>>>>> the task
>>>>> > > and the
>>>>> > > >>>> job manager. I understand that there is concern that such a
>>>>> powerful
>>>>> > > >>>> mechanism should not be available to all operators.
>>>>> Nevertheless, I
>>>>> > > see the
>>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
>>>>> limiting
>>>>> > > this
>>>>> > > >>>> feature to sinks (and sources) only.
>>>>> > > >>>>
>>>>> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext
>>>>> is
>>>>> > > >>>> enough to achieve what you want. There will be additional work
>>>>> > > necessary,
>>>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
>>>>> which
>>>>> > > handles
>>>>> > > >>>> the RPC calls and the checkpointing. I think it would be good
>>>>> to
>>>>> > > outline
>>>>> > > >>>> this in the FLIP.
>>>>> > > >>>>
>>>>> > > >>>> -Max
>>>>> > > >>>>
>>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>>>>> stevenz3wu@gmail.com>
>>>>> > > wrote:
>>>>> > > >>>>
>>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>>>> > > >>>>>
>>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
>>>>> > > FLINK-27405
>>>>> > > >>>>> [1],
>>>>> > > >>>>> Avid pointed out some implications regarding checkpointing.
>>>>> In this
>>>>> > > >>>>> small
>>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic,
>>>>> we
>>>>> > mainly
>>>>> > > >>>>> need
>>>>> > > >>>>> the coordinator context functionality to facilitate the
>>>>> > communication
>>>>> > > >>>>> between coordinator and subtasks.
>>>>> > > >>>>>
>>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>>>> > > >>>>>
>>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>>>> stevenz3wu@gmail.com>
>>>>> > > >>>>> wrote:
>>>>> > > >>>>>
>>>>> > > >>>>> > Hang, appreciate your input. Agree that
>>>>> `CoordinatorContextBase`
>>>>> > > is a
>>>>> > > >>>>> > better name considering Flink code convention.
>>>>> > > >>>>> >
>>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In the
>>>>> jira,
>>>>> > > >>>>> >
>>>>> > > >>>>> >
>>>>> > > >>>>> >
>>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>>>> > ruanhang1993@gmail.com
>>>>> > > >
>>>>> > > >>>>> wrote:
>>>>> > > >>>>> >
>>>>> > > >>>>> >> Hi,
>>>>> > > >>>>> >>
>>>>> > > >>>>> >> IMP, I agree to extract a base class for
>>>>> > SourceCoordinatorContext.
>>>>> > > >>>>> >> But I prefer to use the name
>>>>> `OperatorCoordinatorContextBase` or
>>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>>>> `SourceReaderBase`.
>>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
>>>>> occur
>>>>> > > when
>>>>> > > >>>>> >> connectors start to use it.
>>>>> > > >>>>> >>
>>>>> > > >>>>> >> Best,
>>>>> > > >>>>> >> Hang
>>>>> > > >>>>> >>
>>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
>>>>> > > >>>>> >>
>>>>> > > >>>>> >> > Piotr,
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>>>> @Iinternal
>>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>>>> > > >>>>> BaseCoordinatorContext.
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> > The motivation is that other operators can leverage the
>>>>> > > >>>>> communication
>>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>>>>> subtasks. For
>>>>> > > >>>>> example,
>>>>> > > >>>>> >> in
>>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink
>>>>> Iceberg sink)
>>>>> > > can
>>>>> > > >>>>> >> leverage
>>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>>>> > > >>>>> >> > * subtasks calculate local statistics and periodically
>>>>> send
>>>>> > them
>>>>> > > >>>>> to the
>>>>> > > >>>>> >> > coordinator for global aggregation.
>>>>> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
>>>>> > > >>>>> statistics to
>>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>>>> decision
>>>>> > > >>>>> (selecting
>>>>> > > >>>>> >> > downstream channels).
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> > Thanks,
>>>>> > > >>>>> >> > Steven
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>>>> > > >>>>> pnowojski@apache.org>
>>>>> > > >>>>> >> > wrote:
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >> > > Hi,
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have
>>>>> in mind?
>>>>> > > >>>>> From the
>>>>> > > >>>>> >> > > context I would understand that the newly extracted
>>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>>>>> > > >>>>> `@PublicEvolving`
>>>>> > > >>>>> >> or
>>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>>>>> keeping
>>>>> > > >>>>> `@Internal`
>>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>>>>> could have
>>>>> > > >>>>> been
>>>>> > > >>>>> >> removed
>>>>> > > >>>>> >> > > at any point of time in the future. Having said that,
>>>>> it
>>>>> > > sounds
>>>>> > > >>>>> to me
>>>>> > > >>>>> >> > like
>>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
>>>>> first
>>>>> > > glance
>>>>> > > >>>>> and
>>>>> > > >>>>> >> you
>>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>>>> concept to
>>>>> > > the
>>>>> > > >>>>> public
>>>>> > > >>>>> >> > API?
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it
>>>>> was a
>>>>> > bit
>>>>> > > >>>>> of a
>>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
>>>>> > reasons
>>>>> > > >>>>> however.
>>>>> > > >>>>> >> > Only
>>>>> > > >>>>> >> > > now, I've just heard that there are for example some
>>>>> > problems
>>>>> > > >>>>> with
>>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>>>> > > coordinators.
>>>>> > > >>>>> Maybe
>>>>> > > >>>>> >> > > someone else could shed some light on this?
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
>>>>> > > operator
>>>>> > > >>>>> >> > > coordinators if there is a good reason behind that,
>>>>> but it
>>>>> > is
>>>>> > > a
>>>>> > > >>>>> more
>>>>> > > >>>>> >> > > difficult topic and might be a larger effort than it
>>>>> seems
>>>>> > at
>>>>> > > >>>>> the
>>>>> > > >>>>> >> first
>>>>> > > >>>>> >> > > glance.
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > Best,
>>>>> > > >>>>> >> > > Piotrek
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>>>>> stevenz3wu@gmail.com>
>>>>> > > >>>>> napisał(a):
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked
>>>>> google doc
>>>>> > is
>>>>> > > >>>>> not for
>>>>> > > >>>>> >> > this
>>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page.
>>>>> The
>>>>> > linked
>>>>> > > >>>>> google
>>>>> > > >>>>> >> doc
>>>>> > > >>>>> >> > > is
>>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>>>>> Iceberg
>>>>> > sink,
>>>>> > > >>>>> which
>>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>>>>> > coordinator
>>>>> > > >>>>> can
>>>>> > > >>>>> >> > leverage
>>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
>>>>> > > >>>>> duplication.
>>>>> > > >>>>> >> > > >
>>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>>>> > jing@ververica.com>
>>>>> > > >>>>> wrote:
>>>>> > > >>>>> >> > > >
>>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>>>>> good! One
>>>>> > > >>>>> small
>>>>> > > >>>>> >> thing,
>>>>> > > >>>>> >> > > you
>>>>> > > >>>>> >> > > > > might want to write all content on the wiki page
>>>>> instead
>>>>> > > of
>>>>> > > >>>>> >> linking
>>>>> > > >>>>> >> > to
>>>>> > > >>>>> >> > > a
>>>>> > > >>>>> >> > > > > google doc. The reason is that some people might
>>>>> not be
>>>>> > > >>>>> able to
>>>>> > > >>>>> >> > access
>>>>> > > >>>>> >> > > > the
>>>>> > > >>>>> >> > > > > google doc.
>>>>> > > >>>>> >> > > > >
>>>>> > > >>>>> >> > > > > Best regards,
>>>>> > > >>>>> >> > > > > Jing
>>>>> > > >>>>> >> > > > >
>>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>>>> > > yegangapp@gmail.com
>>>>> > > >>>>> >
>>>>> > > >>>>> >> wrote:
>>>>> > > >>>>> >> > > > >
>>>>> > > >>>>> >> > > > >> Hi,
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>>>> > > >>>>> >> > > > >> <
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > >
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >>
>>>>> > > >>>>>
>>>>> > >
>>>>> >
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>>>> > > >>>>> >> > > > >> >
>>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext
>>>>> from
>>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
>>>>> > > >>>>> coordinators E.g.
>>>>> > > >>>>> >> in
>>>>> > > >>>>> >> > > the
>>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>>>> > > >>>>> >> > > > >> <
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > >
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >>
>>>>> > > >>>>>
>>>>> > >
>>>>> >
>>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>>> > > >>>>> >> > > > >> >
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > > >> Could you help to take a look?
>>>>> > > >>>>> >> > > > >> Thanks
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > > >> Gang
>>>>> > > >>>>> >> > > > >>
>>>>> > > >>>>> >> > > > >
>>>>> > > >>>>> >> > > >
>>>>> > > >>>>> >> > >
>>>>> > > >>>>> >> >
>>>>> > > >>>>> >>
>>>>> > > >>>>> >
>>>>> > > >>>>>
>>>>> > > >>>>
>>>>> > >
>>>>> >
>>>>>
>>>>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Maximilian Michels <mx...@apache.org>.
+1 If we wanted to expose the OperatorCoordinator API, we should provide an
adequate interface. The FLIP partially addresses this by trying to factor
out RPC code which other coordinators might make use of, but there is
additional design necessary to realize a public operator API.

Just to be clear, I'm not opposed to any of the changes in the FLIP. I
think they make sense in the context of an Iceberg ShuffleCoordinator in
Flink. If we were to add such a new coordinator, feel free to make the
proposed code refactoring as part of a pull request. A FLIP isn't strictly
necessary here because this is a purely internal change which does not
alter public APIs, nor does it alter the internal architecture, apart from
reusing a bit of existing code. I'm sorry if we consumed some of your time
revising the document but I think we had a healthy discussion here. And
we're definitely looking forward to seeing some of these code changes!

-Max

On Thu, Nov 3, 2022 at 11:56 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi,
>
> Sorry for the delay, but I've given more thoughts into this. First I share
> the same thoughts as Maximilian, that this FLIP is incomplete. As I
> understand it, you are trying to hack existing code to expose small bits of
> internal functionalities as part of the public API without solving many of
> the underlying issues.
>
> For example, what's the point of exposing `CoordinatorContextBase` as a
> public API if users can not use it? After all, the `OperatorCoordinator`
> and `CoordinatedOperatorFactory` would remain internal. At the same time,
> this FLIP would officially force us to support and maintain this
> CoordinatorContextBase, while I have strong feelings that we don't want to
> do that in the long term. I think we would need to take a big step back and
> first discuss how we would like to expose the coordinators and agree how to
> deal with the issues.
>
> First big issue that I see is that I would feel very worried exposing
> coordinator API without at least designing/planning how to deal with
> checkpointing their state. Without that, I'm afraid we might end up in a
> situation where we need to break the API in order to properly support
> stateful coordinators. And at the moment I don't see a good and easy
> solution to this problem.
>
> Second issue is the shape of the exposed public API. Exposing
> `OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a bad
> design, that would expose way too many things to the users, making future
> development more complicated for us and making implementation of those
> interfaces by the user unnecessary difficult. I see this as a similar issue
> as the low level `StreamOperator` API vs the higher level
> `org.apache.flink.api.common.functions.Function` API. (instead of exposing
> `StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
> `ProcessFunction` to expose all of the remaining functionalities in a
> user-friendly way). In the context of the coordinators, I would say that we
> should expose as the public API not the `OperatorCoordinator`, but for
> example some kind of an `EventProcessFunction` that would have a simple
> interface like:
> ```
> interface EventProcessFunction {
>   void processEvent(int subtask, OperatorEvent event, EventDispatcher
> eventDispatcher);
> }
> ```
> + maybe some features like processing time timers/mailbox style async
> actions.
> (or maybe that could have been just a regular `ProcessFunction` but with
> `OperatorEvent` with `int subtask` as input/output).
>
> Best,
> Piotrek
>
> śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):
>
>> Hi Max and Qingsheng,
>>
>> Thanks for the feedback. The initial motivation to propose this is to
>> reduce the duplicated code since ShuffleCoordinator would need similar
>> communication logic as SourceCoordinator to talk with operators. I
>> understand your concern that OperatorCoordinator is an internal class and
>> except SourceCoordinator for now no other uses this.
>> How about let's do it like what Qingsheng said? I can go ahead with the
>> ShufflingCoordinator implementation without the extraction. Then we have
>> intuitive sense of how many codes are copied and can be reused. If we feel
>> that there is still a need to extract, we can revisit the discussion.
>>
>> Thanks
>> Gang
>>
>>
>>
>> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> wrote:
>>
>>> Thanks Gang and Steven for the FLIP. Actually I share the same concern
>>> with Piotr and Maximilian.
>>>
>>> OperatorCoordinator is marked as @Internal intentionally considering
>>> some existing issues, like consistency between non-source operator and
>>> coordinator on checkpoint. I'm wondering if it is useful to expose a public
>>> context to developers but have the OperatorCoordinator as an internal API.
>>> If we finally close all issues and decide to expose the operator
>>> coordinator API, it would be a better chance to include the base context as
>>> a part of it.
>>>
>>> Best,
>>> Qingsheng
>>>
>>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> Thanks Steven! My confusion stemmed from the lack of context in the
>>>> FLIP.
>>>> The first version did not lay out how the refactoring would be used down
>>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API
>>>> is a
>>>> non-public API and before reading the code, I wasn't even aware how
>>>> exactly
>>>> it worked and whether it would be available to regular operators (it was
>>>> originally intended for sources only).
>>>>
>>>> I might seem pedantic here but I believe the purpose of a FLIP should
>>>> be to
>>>> describe the *why* behind the changes, not only the changes itself. A
>>>> FLIP
>>>> is not a formality but is a tool to communicate and discuss changes. I
>>>> think we still haven't laid out the exact reasons why we are factoring
>>>> out
>>>> the base. As far as I understand now, we need the base class to deal
>>>> with
>>>> concurrent updates in the custom Coordinator from the runtime
>>>> (sub)tasks.
>>>> Effectively, we are enforcing an actor model for the processing of the
>>>> incoming messages such that the OperatorCoordinator can cleanly update
>>>> its
>>>> state. However, if there are no actual implementations that make use of
>>>> the
>>>> refactoring in Flink itself, I wonder if it would make sense to copy
>>>> this
>>>> code to the downstream implementation, e.g. the ShuffleCoordinator. As
>>>> soon
>>>> as it is part of Flink, we could of course try to consolidate this code.
>>>>
>>>> Considering the *how* of this, there appear to be both methods from
>>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>>> SourceCoordinatorContext
>>>> listed in the FLIP, as well as methods which do not appear anywhere in
>>>> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator.
>>>> It
>>>> appears that some of this has been extracted from a downstream
>>>> implementation. It would be great to adjust this, such that it reflects
>>>> the
>>>> status quo in Flink.
>>>>
>>>> -Max
>>>>
>>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com> wrote:
>>>>
>>>> > Max,
>>>> >
>>>> > Thanks a lot for the comments. We should clarify that the shuffle
>>>> > operator/coordinator is not really part of the Flink sink
>>>> > function/operator. shuffle operator is a custom operator that can be
>>>> > inserted right before the Iceberg writer operator. Shuffle operator
>>>> > calculates the traffic statistics and performs a custom
>>>> partition/shuffle
>>>> > (DataStream#partitionCustom) to cluster the data right before they
>>>> get to
>>>> > the Iceberg writer operator.
>>>> >
>>>> > We are not proposing to introduce a sink coordinator for the sink
>>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>>> > facilitate the communication btw shuffle subtasks and coordinator for
>>>> > traffic statistics aggregation. The communication part is already
>>>> > implemented by SourceCoordinatorContext.
>>>> >
>>>> > Here are some details about the communication needs.
>>>> > - subtasks periodically calculate local statistics and send to the
>>>> > coordinator for global aggregation
>>>> > - the coordinator sends the globally aggregated statistics to the
>>>> subtasks
>>>> > - subtasks use the globally aggregated statistics to guide the
>>>> > partition/shuffle decision
>>>> >
>>>> > Regards,
>>>> > Steven
>>>> >
>>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>> >
>>>> > > Hi Gang,
>>>> > >
>>>> > > Looks much better! I've actually gone through the
>>>> OperatorCoordinator
>>>> > code.
>>>> > > It turns out, any operator already has an OperatorCoordinator
>>>> assigned.
>>>> > > Also, any operator can add custom coordinator code. So it looks
>>>> like you
>>>> > > won't have to implement any additional runtime logic to add a
>>>> > > ShuffleCoordinator. However, I'm wondering, why do you specifically
>>>> need
>>>> > to
>>>> > > refactor the SourceCoordinatorContext? You could simply add your own
>>>> > > coordinator code. I'm not sure the sink requirements map to the
>>>> source
>>>> > > interface so closely that you can reuse the same logic.
>>>> > >
>>>> > > If you can refactor SourceCoordinatorContext in a way that makes it
>>>> fit
>>>> > > your use case, I have nothing to object here. By the way, another
>>>> example
>>>> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
>>>> > which
>>>> > > is quite trivial but it might be worth evaluating whether you need
>>>> the
>>>> > full
>>>> > > power of SourceCoordinatorContext which is why I wanted to get more
>>>> > > context.
>>>> > >
>>>> > > -Max
>>>> > >
>>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com>
>>>> wrote:
>>>> > >
>>>> > > > Hi Max,
>>>> > > > I got your concern. Since shuffling support for Flink Iceberg
>>>> sink is
>>>> > not
>>>> > > > the main body of the proposal, I add another appendix part just
>>>> now
>>>> > with
>>>> > > > more details about how to use CoordinatorContextBase and how to
>>>> define
>>>> > > > ShufflingCoordinator.
>>>> > > >
>>>> > > > Let me know if that cannot solve your concern.
>>>> > > >
>>>> > > > Thanks
>>>> > > > Gang
>>>> > > >
>>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <
>>>> mxm@apache.org>
>>>> > > wrote:
>>>> > > >
>>>> > > >> Hey Gang,
>>>> > > >>
>>>> > > >> What I'm looking for here is a complete picture of why the
>>>> change is
>>>> > > >> necessary and what the next steps are. Ultimately, refactoring
>>>> any
>>>> > code
>>>> > > >> serves a purpose. Here, we want to refactor the Coordinator code
>>>> such
>>>> > > that
>>>> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator.
>>>> The
>>>> > FLIP
>>>> > > >> should address the next steps, i.e. how you plan to add the
>>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't
>>>> have to
>>>> > be
>>>> > > in
>>>> > > >> great detail but without this information, I don't think the
>>>> FLIP is
>>>> > > >> complete.
>>>> > > >>
>>>> > > >> This feature should be generic enough to be usable by other
>>>> sinks than
>>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>>> > > implementation
>>>> > > >> which may be outlined in a separate FLIP.
>>>> > > >>
>>>> > > >> Unless there is a good reason, normal operators should not
>>>> support the
>>>> > > >> coordinator functionality. At least I'm not convinced it would
>>>> play
>>>> > well
>>>> > > >> with Flink's execution model. But I see how it is required for
>>>> sources
>>>> > > and
>>>> > > >> sinks.
>>>> > > >>
>>>> > > >> -Max
>>>> > > >>
>>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
>>>> wrote:
>>>> > > >>
>>>> > > >>> Hi Max,
>>>> > > >>>
>>>> > > >>> Thanks for reviewing.
>>>> > > >>>
>>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
>>>> calls
>>>> > > >>> between the task and the job manager for communications and
>>>> won't
>>>> > touch
>>>> > > >>> watermark checkpoint.
>>>> > > >>> If the coordinator doesn't need RPC calls to talk with
>>>> subtasks, then
>>>> > > it
>>>> > > >>> can define context without extending from the
>>>> > CoordinatorContextBase(or
>>>> > > >>> find another class name to limit the scope).
>>>> > > >>>
>>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
>>>> only do
>>>> > > >>> context extraction. The shuffling coordinator and operator
>>>> > > >>> <
>>>> > >
>>>> >
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> > > >
>>>> > > >>> which will use the context will come with a separate proposal,
>>>> thus
>>>> > we
>>>> > > try
>>>> > > >>> to keep it simple in Flip 264 to understand. I can add a little
>>>> bit
>>>> > > more
>>>> > > >>> about how to use the coordinator context in Flip 264 if you
>>>> think
>>>> > that
>>>> > > will
>>>> > > >>> be helpful.
>>>> > > >>>
>>>> > > >>> Thanks!
>>>> > > >>> Gang
>>>> > > >>>
>>>> > > >>>
>>>> > > >>>
>>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>>> mxm@apache.org>
>>>> > > >>> wrote:
>>>> > > >>>
>>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
>>>> bigger
>>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
>>>> > > specifically
>>>> > > >>>> added to synchronize the global watermark and to assign splits
>>>> > > dynamically.
>>>> > > >>>> However, it practically allows arbitrary RPC calls between the
>>>> task
>>>> > > and the
>>>> > > >>>> job manager. I understand that there is concern that such a
>>>> powerful
>>>> > > >>>> mechanism should not be available to all operators.
>>>> Nevertheless, I
>>>> > > see the
>>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
>>>> limiting
>>>> > > this
>>>> > > >>>> feature to sinks (and sources) only.
>>>> > > >>>>
>>>> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext
>>>> is
>>>> > > >>>> enough to achieve what you want. There will be additional work
>>>> > > necessary,
>>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
>>>> which
>>>> > > handles
>>>> > > >>>> the RPC calls and the checkpointing. I think it would be good
>>>> to
>>>> > > outline
>>>> > > >>>> this in the FLIP.
>>>> > > >>>>
>>>> > > >>>> -Max
>>>> > > >>>>
>>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <
>>>> stevenz3wu@gmail.com>
>>>> > > wrote:
>>>> > > >>>>
>>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>>> > > >>>>>
>>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
>>>> > > FLINK-27405
>>>> > > >>>>> [1],
>>>> > > >>>>> Avid pointed out some implications regarding checkpointing.
>>>> In this
>>>> > > >>>>> small
>>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
>>>> > mainly
>>>> > > >>>>> need
>>>> > > >>>>> the coordinator context functionality to facilitate the
>>>> > communication
>>>> > > >>>>> between coordinator and subtasks.
>>>> > > >>>>>
>>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>>> > > >>>>>
>>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>>> stevenz3wu@gmail.com>
>>>> > > >>>>> wrote:
>>>> > > >>>>>
>>>> > > >>>>> > Hang, appreciate your input. Agree that
>>>> `CoordinatorContextBase`
>>>> > > is a
>>>> > > >>>>> > better name considering Flink code convention.
>>>> > > >>>>> >
>>>> > > >>>>> > If there are any concrete concerns, we can discuss. In the
>>>> jira,
>>>> > > >>>>> >
>>>> > > >>>>> >
>>>> > > >>>>> >
>>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>>> > ruanhang1993@gmail.com
>>>> > > >
>>>> > > >>>>> wrote:
>>>> > > >>>>> >
>>>> > > >>>>> >> Hi,
>>>> > > >>>>> >>
>>>> > > >>>>> >> IMP, I agree to extract a base class for
>>>> > SourceCoordinatorContext.
>>>> > > >>>>> >> But I prefer to use the name
>>>> `OperatorCoordinatorContextBase` or
>>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>>> `SourceReaderBase`.
>>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
>>>> occur
>>>> > > when
>>>> > > >>>>> >> connectors start to use it.
>>>> > > >>>>> >>
>>>> > > >>>>> >> Best,
>>>> > > >>>>> >> Hang
>>>> > > >>>>> >>
>>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
>>>> > > >>>>> >>
>>>> > > >>>>> >> > Piotr,
>>>> > > >>>>> >> >
>>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>>> @Iinternal
>>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>>> > > >>>>> BaseCoordinatorContext.
>>>> > > >>>>> >> >
>>>> > > >>>>> >> > The motivation is that other operators can leverage the
>>>> > > >>>>> communication
>>>> > > >>>>> >> > mechanism btw operator coordinator and operator
>>>> subtasks. For
>>>> > > >>>>> example,
>>>> > > >>>>> >> in
>>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg
>>>> sink)
>>>> > > can
>>>> > > >>>>> >> leverage
>>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>>> > > >>>>> >> > * subtasks calculate local statistics and periodically
>>>> send
>>>> > them
>>>> > > >>>>> to the
>>>> > > >>>>> >> > coordinator for global aggregation.
>>>> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
>>>> > > >>>>> statistics to
>>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>>> decision
>>>> > > >>>>> (selecting
>>>> > > >>>>> >> > downstream channels).
>>>> > > >>>>> >> >
>>>> > > >>>>> >> > Thanks,
>>>> > > >>>>> >> > Steven
>>>> > > >>>>> >> >
>>>> > > >>>>> >> >
>>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>>> > > >>>>> pnowojski@apache.org>
>>>> > > >>>>> >> > wrote:
>>>> > > >>>>> >> >
>>>> > > >>>>> >> > > Hi,
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > Could you clarify what's the proposal that you have in
>>>> mind?
>>>> > > >>>>> From the
>>>> > > >>>>> >> > > context I would understand that the newly extracted
>>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>>>> > > >>>>> `@PublicEvolving`
>>>> > > >>>>> >> or
>>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>>>> keeping
>>>> > > >>>>> `@Internal`
>>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class
>>>> could have
>>>> > > >>>>> been
>>>> > > >>>>> >> removed
>>>> > > >>>>> >> > > at any point of time in the future. Having said that,
>>>> it
>>>> > > sounds
>>>> > > >>>>> to me
>>>> > > >>>>> >> > like
>>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the
>>>> first
>>>> > > glance
>>>> > > >>>>> and
>>>> > > >>>>> >> you
>>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>>> concept to
>>>> > > the
>>>> > > >>>>> public
>>>> > > >>>>> >> > API?
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it
>>>> was a
>>>> > bit
>>>> > > >>>>> of a
>>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
>>>> > reasons
>>>> > > >>>>> however.
>>>> > > >>>>> >> > Only
>>>> > > >>>>> >> > > now, I've just heard that there are for example some
>>>> > problems
>>>> > > >>>>> with
>>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>>> > > coordinators.
>>>> > > >>>>> Maybe
>>>> > > >>>>> >> > > someone else could shed some light on this?
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
>>>> > > operator
>>>> > > >>>>> >> > > coordinators if there is a good reason behind that,
>>>> but it
>>>> > is
>>>> > > a
>>>> > > >>>>> more
>>>> > > >>>>> >> > > difficult topic and might be a larger effort than it
>>>> seems
>>>> > at
>>>> > > >>>>> the
>>>> > > >>>>> >> first
>>>> > > >>>>> >> > > glance.
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > Best,
>>>> > > >>>>> >> > > Piotrek
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <
>>>> stevenz3wu@gmail.com>
>>>> > > >>>>> napisał(a):
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google
>>>> doc
>>>> > is
>>>> > > >>>>> not for
>>>> > > >>>>> >> > this
>>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
>>>> > linked
>>>> > > >>>>> google
>>>> > > >>>>> >> doc
>>>> > > >>>>> >> > > is
>>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink
>>>> Iceberg
>>>> > sink,
>>>> > > >>>>> which
>>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>>>> > coordinator
>>>> > > >>>>> can
>>>> > > >>>>> >> > leverage
>>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
>>>> > > >>>>> duplication.
>>>> > > >>>>> >> > > >
>>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>>> > jing@ververica.com>
>>>> > > >>>>> wrote:
>>>> > > >>>>> >> > > >
>>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall
>>>> good! One
>>>> > > >>>>> small
>>>> > > >>>>> >> thing,
>>>> > > >>>>> >> > > you
>>>> > > >>>>> >> > > > > might want to write all content on the wiki page
>>>> instead
>>>> > > of
>>>> > > >>>>> >> linking
>>>> > > >>>>> >> > to
>>>> > > >>>>> >> > > a
>>>> > > >>>>> >> > > > > google doc. The reason is that some people might
>>>> not be
>>>> > > >>>>> able to
>>>> > > >>>>> >> > access
>>>> > > >>>>> >> > > > the
>>>> > > >>>>> >> > > > > google doc.
>>>> > > >>>>> >> > > > >
>>>> > > >>>>> >> > > > > Best regards,
>>>> > > >>>>> >> > > > > Jing
>>>> > > >>>>> >> > > > >
>>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>>> > > yegangapp@gmail.com
>>>> > > >>>>> >
>>>> > > >>>>> >> wrote:
>>>> > > >>>>> >> > > > >
>>>> > > >>>>> >> > > > >> Hi,
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>>> > > >>>>> >> > > > >> <
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > >
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> >
>>>> > > >>>>> >>
>>>> > > >>>>>
>>>> > >
>>>> >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>>> > > >>>>> >> > > > >> >
>>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext
>>>> from
>>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
>>>> > > >>>>> coordinators E.g.
>>>> > > >>>>> >> in
>>>> > > >>>>> >> > > the
>>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>>> > > >>>>> >> > > > >> <
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > >
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> >
>>>> > > >>>>> >>
>>>> > > >>>>>
>>>> > >
>>>> >
>>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>>> > > >>>>> >> > > > >> >
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > > >> Could you help to take a look?
>>>> > > >>>>> >> > > > >> Thanks
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > > >> Gang
>>>> > > >>>>> >> > > > >>
>>>> > > >>>>> >> > > > >
>>>> > > >>>>> >> > > >
>>>> > > >>>>> >> > >
>>>> > > >>>>> >> >
>>>> > > >>>>> >>
>>>> > > >>>>> >
>>>> > > >>>>>
>>>> > > >>>>
>>>> > >
>>>> >
>>>>
>>>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

Sorry for the delay, but I've given more thoughts into this. First I share
the same thoughts as Maximilian, that this FLIP is incomplete. As I
understand it, you are trying to hack existing code to expose small bits of
internal functionalities as part of the public API without solving many of
the underlying issues.

For example, what's the point of exposing `CoordinatorContextBase` as a
public API if users can not use it? After all, the `OperatorCoordinator`
and `CoordinatedOperatorFactory` would remain internal. At the same time,
this FLIP would officially force us to support and maintain this
CoordinatorContextBase, while I have strong feelings that we don't want to
do that in the long term. I think we would need to take a big step back and
first discuss how we would like to expose the coordinators and agree how to
deal with the issues.

First big issue that I see is that I would feel very worried exposing
coordinator API without at least designing/planning how to deal with
checkpointing their state. Without that, I'm afraid we might end up in a
situation where we need to break the API in order to properly support
stateful coordinators. And at the moment I don't see a good and easy
solution to this problem.

Second issue is the shape of the exposed public API. Exposing
`OperatorCoordinator` or  `CoordinatorContextBase` looks to me like a bad
design, that would expose way too many things to the users, making future
development more complicated for us and making implementation of those
interfaces by the user unnecessary difficult. I see this as a similar issue
as the low level `StreamOperator` API vs the higher level
`org.apache.flink.api.common.functions.Function` API. (instead of exposing
`StreamOperator`, `AbstractStreamOperatorV2` etc, we should beef up the
`ProcessFunction` to expose all of the remaining functionalities in a
user-friendly way). In the context of the coordinators, I would say that we
should expose as the public API not the `OperatorCoordinator`, but for
example some kind of an `EventProcessFunction` that would have a simple
interface like:
```
interface EventProcessFunction {
  void processEvent(int subtask, OperatorEvent event, EventDispatcher
eventDispatcher);
}
```
+ maybe some features like processing time timers/mailbox style async
actions.
(or maybe that could have been just a regular `ProcessFunction` but with
`OperatorEvent` with `int subtask` as input/output).

Best,
Piotrek

śr., 2 lis 2022 o 19:40 gang ye <ye...@gmail.com> napisał(a):

> Hi Max and Qingsheng,
>
> Thanks for the feedback. The initial motivation to propose this is to
> reduce the duplicated code since ShuffleCoordinator would need similar
> communication logic as SourceCoordinator to talk with operators. I
> understand your concern that OperatorCoordinator is an internal class and
> except SourceCoordinator for now no other uses this.
> How about let's do it like what Qingsheng said? I can go ahead with the
> ShufflingCoordinator implementation without the extraction. Then we have
> intuitive sense of how many codes are copied and can be reused. If we feel
> that there is still a need to extract, we can revisit the discussion.
>
> Thanks
> Gang
>
>
>
> On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> wrote:
>
>> Thanks Gang and Steven for the FLIP. Actually I share the same concern
>> with Piotr and Maximilian.
>>
>> OperatorCoordinator is marked as @Internal intentionally considering some
>> existing issues, like consistency between non-source operator and
>> coordinator on checkpoint. I'm wondering if it is useful to expose a public
>> context to developers but have the OperatorCoordinator as an internal API.
>> If we finally close all issues and decide to expose the operator
>> coordinator API, it would be a better chance to include the base context as
>> a part of it.
>>
>> Best,
>> Qingsheng
>>
>> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org> wrote:
>>
>>> Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
>>> The first version did not lay out how the refactoring would be used down
>>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is
>>> a
>>> non-public API and before reading the code, I wasn't even aware how
>>> exactly
>>> it worked and whether it would be available to regular operators (it was
>>> originally intended for sources only).
>>>
>>> I might seem pedantic here but I believe the purpose of a FLIP should be
>>> to
>>> describe the *why* behind the changes, not only the changes itself. A
>>> FLIP
>>> is not a formality but is a tool to communicate and discuss changes. I
>>> think we still haven't laid out the exact reasons why we are factoring
>>> out
>>> the base. As far as I understand now, we need the base class to deal with
>>> concurrent updates in the custom Coordinator from the runtime (sub)tasks.
>>> Effectively, we are enforcing an actor model for the processing of the
>>> incoming messages such that the OperatorCoordinator can cleanly update
>>> its
>>> state. However, if there are no actual implementations that make use of
>>> the
>>> refactoring in Flink itself, I wonder if it would make sense to copy this
>>> code to the downstream implementation, e.g. the ShuffleCoordinator. As
>>> soon
>>> as it is part of Flink, we could of course try to consolidate this code.
>>>
>>> Considering the *how* of this, there appear to be both methods from
>>> SourceCoordinator (e.g. runInEventLoop) as well as
>>> SourceCoordinatorContext
>>> listed in the FLIP, as well as methods which do not appear anywhere in
>>> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
>>> appears that some of this has been extracted from a downstream
>>> implementation. It would be great to adjust this, such that it reflects
>>> the
>>> status quo in Flink.
>>>
>>> -Max
>>>
>>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com> wrote:
>>>
>>> > Max,
>>> >
>>> > Thanks a lot for the comments. We should clarify that the shuffle
>>> > operator/coordinator is not really part of the Flink sink
>>> > function/operator. shuffle operator is a custom operator that can be
>>> > inserted right before the Iceberg writer operator. Shuffle operator
>>> > calculates the traffic statistics and performs a custom
>>> partition/shuffle
>>> > (DataStream#partitionCustom) to cluster the data right before they get
>>> to
>>> > the Iceberg writer operator.
>>> >
>>> > We are not proposing to introduce a sink coordinator for the sink
>>> > interface. Shuffle operator needs the CoordinatorContextBase to
>>> > facilitate the communication btw shuffle subtasks and coordinator for
>>> > traffic statistics aggregation. The communication part is already
>>> > implemented by SourceCoordinatorContext.
>>> >
>>> > Here are some details about the communication needs.
>>> > - subtasks periodically calculate local statistics and send to the
>>> > coordinator for global aggregation
>>> > - the coordinator sends the globally aggregated statistics to the
>>> subtasks
>>> > - subtasks use the globally aggregated statistics to guide the
>>> > partition/shuffle decision
>>> >
>>> > Regards,
>>> > Steven
>>> >
>>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>> >
>>> > > Hi Gang,
>>> > >
>>> > > Looks much better! I've actually gone through the OperatorCoordinator
>>> > code.
>>> > > It turns out, any operator already has an OperatorCoordinator
>>> assigned.
>>> > > Also, any operator can add custom coordinator code. So it looks like
>>> you
>>> > > won't have to implement any additional runtime logic to add a
>>> > > ShuffleCoordinator. However, I'm wondering, why do you specifically
>>> need
>>> > to
>>> > > refactor the SourceCoordinatorContext? You could simply add your own
>>> > > coordinator code. I'm not sure the sink requirements map to the
>>> source
>>> > > interface so closely that you can reuse the same logic.
>>> > >
>>> > > If you can refactor SourceCoordinatorContext in a way that makes it
>>> fit
>>> > > your use case, I have nothing to object here. By the way, another
>>> example
>>> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
>>> > which
>>> > > is quite trivial but it might be worth evaluating whether you need
>>> the
>>> > full
>>> > > power of SourceCoordinatorContext which is why I wanted to get more
>>> > > context.
>>> > >
>>> > > -Max
>>> > >
>>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com> wrote:
>>> > >
>>> > > > Hi Max,
>>> > > > I got your concern. Since shuffling support for Flink Iceberg sink
>>> is
>>> > not
>>> > > > the main body of the proposal, I add another appendix part just now
>>> > with
>>> > > > more details about how to use CoordinatorContextBase and how to
>>> define
>>> > > > ShufflingCoordinator.
>>> > > >
>>> > > > Let me know if that cannot solve your concern.
>>> > > >
>>> > > > Thanks
>>> > > > Gang
>>> > > >
>>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <mxm@apache.org
>>> >
>>> > > wrote:
>>> > > >
>>> > > >> Hey Gang,
>>> > > >>
>>> > > >> What I'm looking for here is a complete picture of why the change
>>> is
>>> > > >> necessary and what the next steps are. Ultimately, refactoring any
>>> > code
>>> > > >> serves a purpose. Here, we want to refactor the Coordinator code
>>> such
>>> > > that
>>> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator.
>>> The
>>> > FLIP
>>> > > >> should address the next steps, i.e. how you plan to add the
>>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have
>>> to
>>> > be
>>> > > in
>>> > > >> great detail but without this information, I don't think the FLIP
>>> is
>>> > > >> complete.
>>> > > >>
>>> > > >> This feature should be generic enough to be usable by other sinks
>>> than
>>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>>> > > implementation
>>> > > >> which may be outlined in a separate FLIP.
>>> > > >>
>>> > > >> Unless there is a good reason, normal operators should not
>>> support the
>>> > > >> coordinator functionality. At least I'm not convinced it would
>>> play
>>> > well
>>> > > >> with Flink's execution model. But I see how it is required for
>>> sources
>>> > > and
>>> > > >> sinks.
>>> > > >>
>>> > > >> -Max
>>> > > >>
>>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
>>> wrote:
>>> > > >>
>>> > > >>> Hi Max,
>>> > > >>>
>>> > > >>> Thanks for reviewing.
>>> > > >>>
>>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
>>> calls
>>> > > >>> between the task and the job manager for communications and won't
>>> > touch
>>> > > >>> watermark checkpoint.
>>> > > >>> If the coordinator doesn't need RPC calls to talk with subtasks,
>>> then
>>> > > it
>>> > > >>> can define context without extending from the
>>> > CoordinatorContextBase(or
>>> > > >>> find another class name to limit the scope).
>>> > > >>>
>>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
>>> only do
>>> > > >>> context extraction. The shuffling coordinator and operator
>>> > > >>> <
>>> > >
>>> >
>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>> > > >
>>> > > >>> which will use the context will come with a separate proposal,
>>> thus
>>> > we
>>> > > try
>>> > > >>> to keep it simple in Flip 264 to understand. I can add a little
>>> bit
>>> > > more
>>> > > >>> about how to use the coordinator context in Flip 264 if you think
>>> > that
>>> > > will
>>> > > >>> be helpful.
>>> > > >>>
>>> > > >>> Thanks!
>>> > > >>> Gang
>>> > > >>>
>>> > > >>>
>>> > > >>>
>>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>>> mxm@apache.org>
>>> > > >>> wrote:
>>> > > >>>
>>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
>>> bigger
>>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
>>> > > specifically
>>> > > >>>> added to synchronize the global watermark and to assign splits
>>> > > dynamically.
>>> > > >>>> However, it practically allows arbitrary RPC calls between the
>>> task
>>> > > and the
>>> > > >>>> job manager. I understand that there is concern that such a
>>> powerful
>>> > > >>>> mechanism should not be available to all operators.
>>> Nevertheless, I
>>> > > see the
>>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
>>> limiting
>>> > > this
>>> > > >>>> feature to sinks (and sources) only.
>>> > > >>>>
>>> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
>>> > > >>>> enough to achieve what you want. There will be additional work
>>> > > necessary,
>>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
>>> which
>>> > > handles
>>> > > >>>> the RPC calls and the checkpointing. I think it would be good to
>>> > > outline
>>> > > >>>> this in the FLIP.
>>> > > >>>>
>>> > > >>>> -Max
>>> > > >>>>
>>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <stevenz3wu@gmail.com
>>> >
>>> > > wrote:
>>> > > >>>>
>>> > > >>>>> sorry. sent the incomplete reply by mistake.
>>> > > >>>>>
>>> > > >>>>> If there are any concrete concerns, we can discuss. In the
>>> > > FLINK-27405
>>> > > >>>>> [1],
>>> > > >>>>> Avid pointed out some implications regarding checkpointing. In
>>> this
>>> > > >>>>> small
>>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
>>> > mainly
>>> > > >>>>> need
>>> > > >>>>> the coordinator context functionality to facilitate the
>>> > communication
>>> > > >>>>> between coordinator and subtasks.
>>> > > >>>>>
>>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>>> > > >>>>>
>>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <
>>> stevenz3wu@gmail.com>
>>> > > >>>>> wrote:
>>> > > >>>>>
>>> > > >>>>> > Hang, appreciate your input. Agree that
>>> `CoordinatorContextBase`
>>> > > is a
>>> > > >>>>> > better name considering Flink code convention.
>>> > > >>>>> >
>>> > > >>>>> > If there are any concrete concerns, we can discuss. In the
>>> jira,
>>> > > >>>>> >
>>> > > >>>>> >
>>> > > >>>>> >
>>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>>> > ruanhang1993@gmail.com
>>> > > >
>>> > > >>>>> wrote:
>>> > > >>>>> >
>>> > > >>>>> >> Hi,
>>> > > >>>>> >>
>>> > > >>>>> >> IMP, I agree to extract a base class for
>>> > SourceCoordinatorContext.
>>> > > >>>>> >> But I prefer to use the name
>>> `OperatorCoordinatorContextBase` or
>>> > > >>>>> >> `CoordinatorContextBase` as the format like
>>> `SourceReaderBase`.
>>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
>>> occur
>>> > > when
>>> > > >>>>> >> connectors start to use it.
>>> > > >>>>> >>
>>> > > >>>>> >> Best,
>>> > > >>>>> >> Hang
>>> > > >>>>> >>
>>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
>>> > > >>>>> >>
>>> > > >>>>> >> > Piotr,
>>> > > >>>>> >> >
>>> > > >>>>> >> > The proposal is to extract the listed methods from
>>> @Iinternal
>>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>>> > > >>>>> BaseCoordinatorContext.
>>> > > >>>>> >> >
>>> > > >>>>> >> > The motivation is that other operators can leverage the
>>> > > >>>>> communication
>>> > > >>>>> >> > mechanism btw operator coordinator and operator subtasks.
>>> For
>>> > > >>>>> example,
>>> > > >>>>> >> in
>>> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg
>>> sink)
>>> > > can
>>> > > >>>>> >> leverage
>>> > > >>>>> >> > it for computing traffic distribution statistics.
>>> > > >>>>> >> > * subtasks calculate local statistics and periodically
>>> send
>>> > them
>>> > > >>>>> to the
>>> > > >>>>> >> > coordinator for global aggregation.
>>> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
>>> > > >>>>> statistics to
>>> > > >>>>> >> > subtasks, which can be used to guide the shuffling
>>> decision
>>> > > >>>>> (selecting
>>> > > >>>>> >> > downstream channels).
>>> > > >>>>> >> >
>>> > > >>>>> >> > Thanks,
>>> > > >>>>> >> > Steven
>>> > > >>>>> >> >
>>> > > >>>>> >> >
>>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>>> > > >>>>> pnowojski@apache.org>
>>> > > >>>>> >> > wrote:
>>> > > >>>>> >> >
>>> > > >>>>> >> > > Hi,
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > Could you clarify what's the proposal that you have in
>>> mind?
>>> > > >>>>> From the
>>> > > >>>>> >> > > context I would understand that the newly extracted
>>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>>> > > >>>>> `@PublicEvolving`
>>> > > >>>>> >> or
>>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>>> keeping
>>> > > >>>>> `@Internal`
>>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could
>>> have
>>> > > >>>>> been
>>> > > >>>>> >> removed
>>> > > >>>>> >> > > at any point of time in the future. Having said that, it
>>> > > sounds
>>> > > >>>>> to me
>>> > > >>>>> >> > like
>>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the first
>>> > > glance
>>> > > >>>>> and
>>> > > >>>>> >> you
>>> > > >>>>> >> > > actually want to expose the operator coordinator
>>> concept to
>>> > > the
>>> > > >>>>> public
>>> > > >>>>> >> > API?
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > AFAIK there were some discussions about that, and it
>>> was a
>>> > bit
>>> > > >>>>> of a
>>> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
>>> > reasons
>>> > > >>>>> however.
>>> > > >>>>> >> > Only
>>> > > >>>>> >> > > now, I've just heard that there are for example some
>>> > problems
>>> > > >>>>> with
>>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>>> > > coordinators.
>>> > > >>>>> Maybe
>>> > > >>>>> >> > > someone else could shed some light on this?
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
>>> > > operator
>>> > > >>>>> >> > > coordinators if there is a good reason behind that, but
>>> it
>>> > is
>>> > > a
>>> > > >>>>> more
>>> > > >>>>> >> > > difficult topic and might be a larger effort than it
>>> seems
>>> > at
>>> > > >>>>> the
>>> > > >>>>> >> first
>>> > > >>>>> >> > > glance.
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > Best,
>>> > > >>>>> >> > > Piotrek
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <stevenz3wu@gmail.com
>>> >
>>> > > >>>>> napisał(a):
>>> > > >>>>> >> > >
>>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google
>>> doc
>>> > is
>>> > > >>>>> not for
>>> > > >>>>> >> > this
>>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
>>> > linked
>>> > > >>>>> google
>>> > > >>>>> >> doc
>>> > > >>>>> >> > > is
>>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
>>> > sink,
>>> > > >>>>> which
>>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>>> > coordinator
>>> > > >>>>> can
>>> > > >>>>> >> > leverage
>>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
>>> > > >>>>> duplication.
>>> > > >>>>> >> > > >
>>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>>> > jing@ververica.com>
>>> > > >>>>> wrote:
>>> > > >>>>> >> > > >
>>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good!
>>> One
>>> > > >>>>> small
>>> > > >>>>> >> thing,
>>> > > >>>>> >> > > you
>>> > > >>>>> >> > > > > might want to write all content on the wiki page
>>> instead
>>> > > of
>>> > > >>>>> >> linking
>>> > > >>>>> >> > to
>>> > > >>>>> >> > > a
>>> > > >>>>> >> > > > > google doc. The reason is that some people might
>>> not be
>>> > > >>>>> able to
>>> > > >>>>> >> > access
>>> > > >>>>> >> > > > the
>>> > > >>>>> >> > > > > google doc.
>>> > > >>>>> >> > > > >
>>> > > >>>>> >> > > > > Best regards,
>>> > > >>>>> >> > > > > Jing
>>> > > >>>>> >> > > > >
>>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>>> > > yegangapp@gmail.com
>>> > > >>>>> >
>>> > > >>>>> >> wrote:
>>> > > >>>>> >> > > > >
>>> > > >>>>> >> > > > >> Hi,
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > > >> We submit the Flip proposal
>>> > > >>>>> >> > > > >> <
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > >
>>> > > >>>>> >> > >
>>> > > >>>>> >> >
>>> > > >>>>> >>
>>> > > >>>>>
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>>> > > >>>>> >> > > > >> >
>>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
>>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
>>> > > >>>>> coordinators E.g.
>>> > > >>>>> >> in
>>> > > >>>>> >> > > the
>>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>>> > > >>>>> >> > > > >> <
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > >
>>> > > >>>>> >> > >
>>> > > >>>>> >> >
>>> > > >>>>> >>
>>> > > >>>>>
>>> > >
>>> >
>>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>>> > > >>>>> >> > > > >> >
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > > >> Could you help to take a look?
>>> > > >>>>> >> > > > >> Thanks
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > > >> Gang
>>> > > >>>>> >> > > > >>
>>> > > >>>>> >> > > > >
>>> > > >>>>> >> > > >
>>> > > >>>>> >> > >
>>> > > >>>>> >> >
>>> > > >>>>> >>
>>> > > >>>>> >
>>> > > >>>>>
>>> > > >>>>
>>> > >
>>> >
>>>
>>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by gang ye <ye...@gmail.com>.
Hi Max and Qingsheng,

Thanks for the feedback. The initial motivation to propose this is to
reduce the duplicated code since ShuffleCoordinator would need similar
communication logic as SourceCoordinator to talk with operators. I
understand your concern that OperatorCoordinator is an internal class and
except SourceCoordinator for now no other uses this.
How about let's do it like what Qingsheng said? I can go ahead with the
ShufflingCoordinator implementation without the extraction. Then we have
intuitive sense of how many codes are copied and can be reused. If we feel
that there is still a need to extract, we can revisit the discussion.

Thanks
Gang



On Wed, Nov 2, 2022 at 12:21 AM Qingsheng Ren <re...@apache.org> wrote:

> Thanks Gang and Steven for the FLIP. Actually I share the same concern
> with Piotr and Maximilian.
>
> OperatorCoordinator is marked as @Internal intentionally considering some
> existing issues, like consistency between non-source operator and
> coordinator on checkpoint. I'm wondering if it is useful to expose a public
> context to developers but have the OperatorCoordinator as an internal API.
> If we finally close all issues and decide to expose the operator
> coordinator API, it would be a better chance to include the base context as
> a part of it.
>
> Best,
> Qingsheng
>
> On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org> wrote:
>
>> Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
>> The first version did not lay out how the refactoring would be used down
>> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a
>> non-public API and before reading the code, I wasn't even aware how
>> exactly
>> it worked and whether it would be available to regular operators (it was
>> originally intended for sources only).
>>
>> I might seem pedantic here but I believe the purpose of a FLIP should be
>> to
>> describe the *why* behind the changes, not only the changes itself. A FLIP
>> is not a formality but is a tool to communicate and discuss changes. I
>> think we still haven't laid out the exact reasons why we are factoring out
>> the base. As far as I understand now, we need the base class to deal with
>> concurrent updates in the custom Coordinator from the runtime (sub)tasks.
>> Effectively, we are enforcing an actor model for the processing of the
>> incoming messages such that the OperatorCoordinator can cleanly update its
>> state. However, if there are no actual implementations that make use of
>> the
>> refactoring in Flink itself, I wonder if it would make sense to copy this
>> code to the downstream implementation, e.g. the ShuffleCoordinator. As
>> soon
>> as it is part of Flink, we could of course try to consolidate this code.
>>
>> Considering the *how* of this, there appear to be both methods from
>> SourceCoordinator (e.g. runInEventLoop) as well as
>> SourceCoordinatorContext
>> listed in the FLIP, as well as methods which do not appear anywhere in
>> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
>> appears that some of this has been extracted from a downstream
>> implementation. It would be great to adjust this, such that it reflects
>> the
>> status quo in Flink.
>>
>> -Max
>>
>> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com> wrote:
>>
>> > Max,
>> >
>> > Thanks a lot for the comments. We should clarify that the shuffle
>> > operator/coordinator is not really part of the Flink sink
>> > function/operator. shuffle operator is a custom operator that can be
>> > inserted right before the Iceberg writer operator. Shuffle operator
>> > calculates the traffic statistics and performs a custom
>> partition/shuffle
>> > (DataStream#partitionCustom) to cluster the data right before they get
>> to
>> > the Iceberg writer operator.
>> >
>> > We are not proposing to introduce a sink coordinator for the sink
>> > interface. Shuffle operator needs the CoordinatorContextBase to
>> > facilitate the communication btw shuffle subtasks and coordinator for
>> > traffic statistics aggregation. The communication part is already
>> > implemented by SourceCoordinatorContext.
>> >
>> > Here are some details about the communication needs.
>> > - subtasks periodically calculate local statistics and send to the
>> > coordinator for global aggregation
>> > - the coordinator sends the globally aggregated statistics to the
>> subtasks
>> > - subtasks use the globally aggregated statistics to guide the
>> > partition/shuffle decision
>> >
>> > Regards,
>> > Steven
>> >
>> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org>
>> wrote:
>> >
>> > > Hi Gang,
>> > >
>> > > Looks much better! I've actually gone through the OperatorCoordinator
>> > code.
>> > > It turns out, any operator already has an OperatorCoordinator
>> assigned.
>> > > Also, any operator can add custom coordinator code. So it looks like
>> you
>> > > won't have to implement any additional runtime logic to add a
>> > > ShuffleCoordinator. However, I'm wondering, why do you specifically
>> need
>> > to
>> > > refactor the SourceCoordinatorContext? You could simply add your own
>> > > coordinator code. I'm not sure the sink requirements map to the source
>> > > interface so closely that you can reuse the same logic.
>> > >
>> > > If you can refactor SourceCoordinatorContext in a way that makes it
>> fit
>> > > your use case, I have nothing to object here. By the way, another
>> example
>> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
>> > which
>> > > is quite trivial but it might be worth evaluating whether you need the
>> > full
>> > > power of SourceCoordinatorContext which is why I wanted to get more
>> > > context.
>> > >
>> > > -Max
>> > >
>> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com> wrote:
>> > >
>> > > > Hi Max,
>> > > > I got your concern. Since shuffling support for Flink Iceberg sink
>> is
>> > not
>> > > > the main body of the proposal, I add another appendix part just now
>> > with
>> > > > more details about how to use CoordinatorContextBase and how to
>> define
>> > > > ShufflingCoordinator.
>> > > >
>> > > > Let me know if that cannot solve your concern.
>> > > >
>> > > > Thanks
>> > > > Gang
>> > > >
>> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <mx...@apache.org>
>> > > wrote:
>> > > >
>> > > >> Hey Gang,
>> > > >>
>> > > >> What I'm looking for here is a complete picture of why the change
>> is
>> > > >> necessary and what the next steps are. Ultimately, refactoring any
>> > code
>> > > >> serves a purpose. Here, we want to refactor the Coordinator code
>> such
>> > > that
>> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The
>> > FLIP
>> > > >> should address the next steps, i.e. how you plan to add the
>> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have
>> to
>> > be
>> > > in
>> > > >> great detail but without this information, I don't think the FLIP
>> is
>> > > >> complete.
>> > > >>
>> > > >> This feature should be generic enough to be usable by other sinks
>> than
>> > > >> the Iceberg sink. Of course Iceberg can still load its own
>> > > implementation
>> > > >> which may be outlined in a separate FLIP.
>> > > >>
>> > > >> Unless there is a good reason, normal operators should not support
>> the
>> > > >> coordinator functionality. At least I'm not convinced it would play
>> > well
>> > > >> with Flink's execution model. But I see how it is required for
>> sources
>> > > and
>> > > >> sinks.
>> > > >>
>> > > >> -Max
>> > > >>
>> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
>> wrote:
>> > > >>
>> > > >>> Hi Max,
>> > > >>>
>> > > >>> Thanks for reviewing.
>> > > >>>
>> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC
>> calls
>> > > >>> between the task and the job manager for communications and won't
>> > touch
>> > > >>> watermark checkpoint.
>> > > >>> If the coordinator doesn't need RPC calls to talk with subtasks,
>> then
>> > > it
>> > > >>> can define context without extending from the
>> > CoordinatorContextBase(or
>> > > >>> find another class name to limit the scope).
>> > > >>>
>> > > >>> Regarding the code-changing scope, for this Flip 264, we will
>> only do
>> > > >>> context extraction. The shuffling coordinator and operator
>> > > >>> <
>> > >
>> >
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>> > > >
>> > > >>> which will use the context will come with a separate proposal,
>> thus
>> > we
>> > > try
>> > > >>> to keep it simple in Flip 264 to understand. I can add a little
>> bit
>> > > more
>> > > >>> about how to use the coordinator context in Flip 264 if you think
>> > that
>> > > will
>> > > >>> be helpful.
>> > > >>>
>> > > >>> Thanks!
>> > > >>> Gang
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <
>> mxm@apache.org>
>> > > >>> wrote:
>> > > >>>
>> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a
>> bigger
>> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
>> > > specifically
>> > > >>>> added to synchronize the global watermark and to assign splits
>> > > dynamically.
>> > > >>>> However, it practically allows arbitrary RPC calls between the
>> task
>> > > and the
>> > > >>>> job manager. I understand that there is concern that such a
>> powerful
>> > > >>>> mechanism should not be available to all operators.
>> Nevertheless, I
>> > > see the
>> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
>> limiting
>> > > this
>> > > >>>> feature to sinks (and sources) only.
>> > > >>>>
>> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
>> > > >>>> enough to achieve what you want. There will be additional work
>> > > necessary,
>> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator
>> which
>> > > handles
>> > > >>>> the RPC calls and the checkpointing. I think it would be good to
>> > > outline
>> > > >>>> this in the FLIP.
>> > > >>>>
>> > > >>>> -Max
>> > > >>>>
>> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <st...@gmail.com>
>> > > wrote:
>> > > >>>>
>> > > >>>>> sorry. sent the incomplete reply by mistake.
>> > > >>>>>
>> > > >>>>> If there are any concrete concerns, we can discuss. In the
>> > > FLINK-27405
>> > > >>>>> [1],
>> > > >>>>> Avid pointed out some implications regarding checkpointing. In
>> this
>> > > >>>>> small
>> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
>> > mainly
>> > > >>>>> need
>> > > >>>>> the coordinator context functionality to facilitate the
>> > communication
>> > > >>>>> between coordinator and subtasks.
>> > > >>>>>
>> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
>> > > >>>>>
>> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <stevenz3wu@gmail.com
>> >
>> > > >>>>> wrote:
>> > > >>>>>
>> > > >>>>> > Hang, appreciate your input. Agree that
>> `CoordinatorContextBase`
>> > > is a
>> > > >>>>> > better name considering Flink code convention.
>> > > >>>>> >
>> > > >>>>> > If there are any concrete concerns, we can discuss. In the
>> jira,
>> > > >>>>> >
>> > > >>>>> >
>> > > >>>>> >
>> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
>> > ruanhang1993@gmail.com
>> > > >
>> > > >>>>> wrote:
>> > > >>>>> >
>> > > >>>>> >> Hi,
>> > > >>>>> >>
>> > > >>>>> >> IMP, I agree to extract a base class for
>> > SourceCoordinatorContext.
>> > > >>>>> >> But I prefer to use the name
>> `OperatorCoordinatorContextBase` or
>> > > >>>>> >> `CoordinatorContextBase` as the format like
>> `SourceReaderBase`.
>> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
>> occur
>> > > when
>> > > >>>>> >> connectors start to use it.
>> > > >>>>> >>
>> > > >>>>> >> Best,
>> > > >>>>> >> Hang
>> > > >>>>> >>
>> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
>> > > >>>>> >>
>> > > >>>>> >> > Piotr,
>> > > >>>>> >> >
>> > > >>>>> >> > The proposal is to extract the listed methods from
>> @Iinternal
>> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
>> > > >>>>> BaseCoordinatorContext.
>> > > >>>>> >> >
>> > > >>>>> >> > The motivation is that other operators can leverage the
>> > > >>>>> communication
>> > > >>>>> >> > mechanism btw operator coordinator and operator subtasks.
>> For
>> > > >>>>> example,
>> > > >>>>> >> in
>> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg
>> sink)
>> > > can
>> > > >>>>> >> leverage
>> > > >>>>> >> > it for computing traffic distribution statistics.
>> > > >>>>> >> > * subtasks calculate local statistics and periodically send
>> > them
>> > > >>>>> to the
>> > > >>>>> >> > coordinator for global aggregation.
>> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
>> > > >>>>> statistics to
>> > > >>>>> >> > subtasks, which can be used to guide the shuffling decision
>> > > >>>>> (selecting
>> > > >>>>> >> > downstream channels).
>> > > >>>>> >> >
>> > > >>>>> >> > Thanks,
>> > > >>>>> >> > Steven
>> > > >>>>> >> >
>> > > >>>>> >> >
>> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
>> > > >>>>> pnowojski@apache.org>
>> > > >>>>> >> > wrote:
>> > > >>>>> >> >
>> > > >>>>> >> > > Hi,
>> > > >>>>> >> > >
>> > > >>>>> >> > > Could you clarify what's the proposal that you have in
>> mind?
>> > > >>>>> From the
>> > > >>>>> >> > > context I would understand that the newly extracted
>> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
>> > > >>>>> `@PublicEvolving`
>> > > >>>>> >> or
>> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and
>> keeping
>> > > >>>>> `@Internal`
>> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could
>> have
>> > > >>>>> been
>> > > >>>>> >> removed
>> > > >>>>> >> > > at any point of time in the future. Having said that, it
>> > > sounds
>> > > >>>>> to me
>> > > >>>>> >> > like
>> > > >>>>> >> > > your proposal is a bit bigger than it looks at the first
>> > > glance
>> > > >>>>> and
>> > > >>>>> >> you
>> > > >>>>> >> > > actually want to expose the operator coordinator concept
>> to
>> > > the
>> > > >>>>> public
>> > > >>>>> >> > API?
>> > > >>>>> >> > >
>> > > >>>>> >> > > AFAIK there were some discussions about that, and it was
>> a
>> > bit
>> > > >>>>> of a
>> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
>> > reasons
>> > > >>>>> however.
>> > > >>>>> >> > Only
>> > > >>>>> >> > > now, I've just heard that there are for example some
>> > problems
>> > > >>>>> with
>> > > >>>>> >> > > checkpointing of hypothetical non source operator
>> > > coordinators.
>> > > >>>>> Maybe
>> > > >>>>> >> > > someone else could shed some light on this?
>> > > >>>>> >> > >
>> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
>> > > operator
>> > > >>>>> >> > > coordinators if there is a good reason behind that, but
>> it
>> > is
>> > > a
>> > > >>>>> more
>> > > >>>>> >> > > difficult topic and might be a larger effort than it
>> seems
>> > at
>> > > >>>>> the
>> > > >>>>> >> first
>> > > >>>>> >> > > glance.
>> > > >>>>> >> > >
>> > > >>>>> >> > > Best,
>> > > >>>>> >> > > Piotrek
>> > > >>>>> >> > >
>> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <st...@gmail.com>
>> > > >>>>> napisał(a):
>> > > >>>>> >> > >
>> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google
>> doc
>> > is
>> > > >>>>> not for
>> > > >>>>> >> > this
>> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
>> > linked
>> > > >>>>> google
>> > > >>>>> >> doc
>> > > >>>>> >> > > is
>> > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
>> > sink,
>> > > >>>>> which
>> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
>> > coordinator
>> > > >>>>> can
>> > > >>>>> >> > leverage
>> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
>> > > >>>>> duplication.
>> > > >>>>> >> > > >
>> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
>> > jing@ververica.com>
>> > > >>>>> wrote:
>> > > >>>>> >> > > >
>> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good!
>> One
>> > > >>>>> small
>> > > >>>>> >> thing,
>> > > >>>>> >> > > you
>> > > >>>>> >> > > > > might want to write all content on the wiki page
>> instead
>> > > of
>> > > >>>>> >> linking
>> > > >>>>> >> > to
>> > > >>>>> >> > > a
>> > > >>>>> >> > > > > google doc. The reason is that some people might not
>> be
>> > > >>>>> able to
>> > > >>>>> >> > access
>> > > >>>>> >> > > > the
>> > > >>>>> >> > > > > google doc.
>> > > >>>>> >> > > > >
>> > > >>>>> >> > > > > Best regards,
>> > > >>>>> >> > > > > Jing
>> > > >>>>> >> > > > >
>> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
>> > > yegangapp@gmail.com
>> > > >>>>> >
>> > > >>>>> >> wrote:
>> > > >>>>> >> > > > >
>> > > >>>>> >> > > > >> Hi,
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > > >> We submit the Flip proposal
>> > > >>>>> >> > > > >> <
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > >
>> > > >>>>> >> > >
>> > > >>>>> >> >
>> > > >>>>> >>
>> > > >>>>>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
>> > > >>>>> >> > > > >> >
>> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
>> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
>> > > >>>>> coordinators E.g.
>> > > >>>>> >> in
>> > > >>>>> >> > > the
>> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
>> > > >>>>> >> > > > >> <
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > >
>> > > >>>>> >> > >
>> > > >>>>> >> >
>> > > >>>>> >>
>> > > >>>>>
>> > >
>> >
>> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
>> > > >>>>> >> > > > >> >
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > > >> Could you help to take a look?
>> > > >>>>> >> > > > >> Thanks
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > > >> Gang
>> > > >>>>> >> > > > >>
>> > > >>>>> >> > > > >
>> > > >>>>> >> > > >
>> > > >>>>> >> > >
>> > > >>>>> >> >
>> > > >>>>> >>
>> > > >>>>> >
>> > > >>>>>
>> > > >>>>
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-264 Extract BaseCoordinatorContext

Posted by Qingsheng Ren <re...@apache.org>.
Thanks Gang and Steven for the FLIP. Actually I share the same concern
with Piotr and Maximilian.

OperatorCoordinator is marked as @Internal intentionally considering some
existing issues, like consistency between non-source operator and
coordinator on checkpoint. I'm wondering if it is useful to expose a public
context to developers but have the OperatorCoordinator as an internal API.
If we finally close all issues and decide to expose the operator
coordinator API, it would be a better chance to include the base context as
a part of it.

Best,
Qingsheng

On Tue, Nov 1, 2022 at 8:29 PM Maximilian Michels <mx...@apache.org> wrote:

> Thanks Steven! My confusion stemmed from the lack of context in the FLIP.
> The first version did not lay out how the refactoring would be used down
> the line, e.g. by the ShuffleCoordinator. The OperatorCoordinator API is a
> non-public API and before reading the code, I wasn't even aware how exactly
> it worked and whether it would be available to regular operators (it was
> originally intended for sources only).
>
> I might seem pedantic here but I believe the purpose of a FLIP should be to
> describe the *why* behind the changes, not only the changes itself. A FLIP
> is not a formality but is a tool to communicate and discuss changes. I
> think we still haven't laid out the exact reasons why we are factoring out
> the base. As far as I understand now, we need the base class to deal with
> concurrent updates in the custom Coordinator from the runtime (sub)tasks.
> Effectively, we are enforcing an actor model for the processing of the
> incoming messages such that the OperatorCoordinator can cleanly update its
> state. However, if there are no actual implementations that make use of the
> refactoring in Flink itself, I wonder if it would make sense to copy this
> code to the downstream implementation, e.g. the ShuffleCoordinator. As soon
> as it is part of Flink, we could of course try to consolidate this code.
>
> Considering the *how* of this, there appear to be both methods from
> SourceCoordinator (e.g. runInEventLoop) as well as SourceCoordinatorContext
> listed in the FLIP, as well as methods which do not appear anywhere in
> Flink code, e.g. subTaskReady / subTaskNotReady / sendEventToOperator. It
> appears that some of this has been extracted from a downstream
> implementation. It would be great to adjust this, such that it reflects the
> status quo in Flink.
>
> -Max
>
> On Fri, Oct 28, 2022 at 5:53 AM Steven Wu <st...@gmail.com> wrote:
>
> > Max,
> >
> > Thanks a lot for the comments. We should clarify that the shuffle
> > operator/coordinator is not really part of the Flink sink
> > function/operator. shuffle operator is a custom operator that can be
> > inserted right before the Iceberg writer operator. Shuffle operator
> > calculates the traffic statistics and performs a custom partition/shuffle
> > (DataStream#partitionCustom) to cluster the data right before they get to
> > the Iceberg writer operator.
> >
> > We are not proposing to introduce a sink coordinator for the sink
> > interface. Shuffle operator needs the CoordinatorContextBase to
> > facilitate the communication btw shuffle subtasks and coordinator for
> > traffic statistics aggregation. The communication part is already
> > implemented by SourceCoordinatorContext.
> >
> > Here are some details about the communication needs.
> > - subtasks periodically calculate local statistics and send to the
> > coordinator for global aggregation
> > - the coordinator sends the globally aggregated statistics to the
> subtasks
> > - subtasks use the globally aggregated statistics to guide the
> > partition/shuffle decision
> >
> > Regards,
> > Steven
> >
> > On Thu, Oct 27, 2022 at 5:38 PM Maximilian Michels <mx...@apache.org>
> wrote:
> >
> > > Hi Gang,
> > >
> > > Looks much better! I've actually gone through the OperatorCoordinator
> > code.
> > > It turns out, any operator already has an OperatorCoordinator assigned.
> > > Also, any operator can add custom coordinator code. So it looks like
> you
> > > won't have to implement any additional runtime logic to add a
> > > ShuffleCoordinator. However, I'm wondering, why do you specifically
> need
> > to
> > > refactor the SourceCoordinatorContext? You could simply add your own
> > > coordinator code. I'm not sure the sink requirements map to the source
> > > interface so closely that you can reuse the same logic.
> > >
> > > If you can refactor SourceCoordinatorContext in a way that makes it fit
> > > your use case, I have nothing to object here. By the way, another
> example
> > > of an existing OperatorCoordinator is CollectSinkOperatorCoordinator
> > which
> > > is quite trivial but it might be worth evaluating whether you need the
> > full
> > > power of SourceCoordinatorContext which is why I wanted to get more
> > > context.
> > >
> > > -Max
> > >
> > > On Thu, Oct 27, 2022 at 4:15 PM gang ye <ye...@gmail.com> wrote:
> > >
> > > > Hi Max,
> > > > I got your concern. Since shuffling support for Flink Iceberg sink is
> > not
> > > > the main body of the proposal, I add another appendix part just now
> > with
> > > > more details about how to use CoordinatorContextBase and how to
> define
> > > > ShufflingCoordinator.
> > > >
> > > > Let me know if that cannot solve your concern.
> > > >
> > > > Thanks
> > > > Gang
> > > >
> > > > On Thu, Oct 27, 2022 at 1:31 PM Maximilian Michels <mx...@apache.org>
> > > wrote:
> > > >
> > > >> Hey Gang,
> > > >>
> > > >> What I'm looking for here is a complete picture of why the change is
> > > >> necessary and what the next steps are. Ultimately, refactoring any
> > code
> > > >> serves a purpose. Here, we want to refactor the Coordinator code
> such
> > > that
> > > >> we can add a SinkCoordinator, similar to the SourceCoordinator. The
> > FLIP
> > > >> should address the next steps, i.e. how you plan to add the
> > > >> SinkCoordinator, its interfaces, runtime changes. It doesn't have to
> > be
> > > in
> > > >> great detail but without this information, I don't think the FLIP is
> > > >> complete.
> > > >>
> > > >> This feature should be generic enough to be usable by other sinks
> than
> > > >> the Iceberg sink. Of course Iceberg can still load its own
> > > implementation
> > > >> which may be outlined in a separate FLIP.
> > > >>
> > > >> Unless there is a good reason, normal operators should not support
> the
> > > >> coordinator functionality. At least I'm not convinced it would play
> > well
> > > >> with Flink's execution model. But I see how it is required for
> sources
> > > and
> > > >> sinks.
> > > >>
> > > >> -Max
> > > >>
> > > >> On Wed, Oct 26, 2022 at 3:05 PM gang ye <ye...@gmail.com>
> wrote:
> > > >>
> > > >>> Hi Max,
> > > >>>
> > > >>> Thanks for reviewing.
> > > >>>
> > > >>> For this Flip 264, yes, we will only focus on abstracting RPC calls
> > > >>> between the task and the job manager for communications and won't
> > touch
> > > >>> watermark checkpoint.
> > > >>> If the coordinator doesn't need RPC calls to talk with subtasks,
> then
> > > it
> > > >>> can define context without extending from the
> > CoordinatorContextBase(or
> > > >>> find another class name to limit the scope).
> > > >>>
> > > >>> Regarding the code-changing scope, for this Flip 264, we will only
> do
> > > >>> context extraction. The shuffling coordinator and operator
> > > >>> <
> > >
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > > >
> > > >>> which will use the context will come with a separate proposal, thus
> > we
> > > try
> > > >>> to keep it simple in Flip 264 to understand. I can add a little bit
> > > more
> > > >>> about how to use the coordinator context in Flip 264 if you think
> > that
> > > will
> > > >>> be helpful.
> > > >>>
> > > >>> Thanks!
> > > >>> Gang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Wed, Oct 26, 2022 at 7:25 AM Maximilian Michels <mxm@apache.org
> >
> > > >>> wrote:
> > > >>>
> > > >>>> Thanks for the proposal, Gang! This is indeed somewhat of a bigger
> > > >>>> change. The coordinator for sources, as part of FLIP-27, was
> > > specifically
> > > >>>> added to synchronize the global watermark and to assign splits
> > > dynamically.
> > > >>>> However, it practically allows arbitrary RPC calls between the
> task
> > > and the
> > > >>>> job manager. I understand that there is concern that such a
> powerful
> > > >>>> mechanism should not be available to all operators. Nevertheless,
> I
> > > see the
> > > >>>> practical use in case of sinks like Iceberg. So I'd suggest
> limiting
> > > this
> > > >>>> feature to sinks (and sources) only.
> > > >>>>
> > > >>>> I'm wondering whether extracting the SourceCoordinatorContext is
> > > >>>> enough to achieve what you want. There will be additional work
> > > necessary,
> > > >>>> e.g. create a SinkCoordinator similarly to SourceCoordinator which
> > > handles
> > > >>>> the RPC calls and the checkpointing. I think it would be good to
> > > outline
> > > >>>> this in the FLIP.
> > > >>>>
> > > >>>> -Max
> > > >>>>
> > > >>>> On Sun, Oct 16, 2022 at 9:01 AM Steven Wu <st...@gmail.com>
> > > wrote:
> > > >>>>
> > > >>>>> sorry. sent the incomplete reply by mistake.
> > > >>>>>
> > > >>>>> If there are any concrete concerns, we can discuss. In the
> > > FLINK-27405
> > > >>>>> [1],
> > > >>>>> Avid pointed out some implications regarding checkpointing. In
> this
> > > >>>>> small
> > > >>>>> FLIP, we are not exposing/changing any checkpointing logic, we
> > mainly
> > > >>>>> need
> > > >>>>> the coordinator context functionality to facilitate the
> > communication
> > > >>>>> between coordinator and subtasks.
> > > >>>>>
> > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-27405
> > > >>>>>
> > > >>>>> On Sun, Oct 16, 2022 at 8:56 AM Steven Wu <st...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>> > Hang, appreciate your input. Agree that
> `CoordinatorContextBase`
> > > is a
> > > >>>>> > better name considering Flink code convention.
> > > >>>>> >
> > > >>>>> > If there are any concrete concerns, we can discuss. In the
> jira,
> > > >>>>> >
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > On Sun, Oct 16, 2022 at 12:12 AM Hang Ruan <
> > ruanhang1993@gmail.com
> > > >
> > > >>>>> wrote:
> > > >>>>> >
> > > >>>>> >> Hi,
> > > >>>>> >>
> > > >>>>> >> IMP, I agree to extract a base class for
> > SourceCoordinatorContext.
> > > >>>>> >> But I prefer to use the name `OperatorCoordinatorContextBase`
> or
> > > >>>>> >> `CoordinatorContextBase` as the format like
> `SourceReaderBase`.
> > > >>>>> >> I also agree to what Piotr said. Maybe more problems will
> occur
> > > when
> > > >>>>> >> connectors start to use it.
> > > >>>>> >>
> > > >>>>> >> Best,
> > > >>>>> >> Hang
> > > >>>>> >>
> > > >>>>> >> Steven Wu <st...@gmail.com> 于2022年10月14日周五 22:31写道:
> > > >>>>> >>
> > > >>>>> >> > Piotr,
> > > >>>>> >> >
> > > >>>>> >> > The proposal is to extract the listed methods from
> @Iinternal
> > > >>>>> >> > SourceCoordinatorContext to a @PublicEvolving
> > > >>>>> BaseCoordinatorContext.
> > > >>>>> >> >
> > > >>>>> >> > The motivation is that other operators can leverage the
> > > >>>>> communication
> > > >>>>> >> > mechanism btw operator coordinator and operator subtasks.
> For
> > > >>>>> example,
> > > >>>>> >> in
> > > >>>>> >> > the linked google doc shuffle operator (in Flink Iceberg
> sink)
> > > can
> > > >>>>> >> leverage
> > > >>>>> >> > it for computing traffic distribution statistics.
> > > >>>>> >> > * subtasks calculate local statistics and periodically send
> > them
> > > >>>>> to the
> > > >>>>> >> > coordinator for global aggregation.
> > > >>>>> >> > * The coordinator can broadcast the globally aggregated
> > > >>>>> statistics to
> > > >>>>> >> > subtasks, which can be used to guide the shuffling decision
> > > >>>>> (selecting
> > > >>>>> >> > downstream channels).
> > > >>>>> >> >
> > > >>>>> >> > Thanks,
> > > >>>>> >> > Steven
> > > >>>>> >> >
> > > >>>>> >> >
> > > >>>>> >> > On Fri, Oct 14, 2022 at 2:16 AM Piotr Nowojski <
> > > >>>>> pnowojski@apache.org>
> > > >>>>> >> > wrote:
> > > >>>>> >> >
> > > >>>>> >> > > Hi,
> > > >>>>> >> > >
> > > >>>>> >> > > Could you clarify what's the proposal that you have in
> mind?
> > > >>>>> From the
> > > >>>>> >> > > context I would understand that the newly extracted
> > > >>>>> >> > > `BaseCoordinatorContext` would have to be marked as
> > > >>>>> `@PublicEvolving`
> > > >>>>> >> or
> > > >>>>> >> > > `@Experimental`, since otherwise extracting it and keeping
> > > >>>>> `@Internal`
> > > >>>>> >> > > wouldn't change much? Such `@Internal` base class could
> have
> > > >>>>> been
> > > >>>>> >> removed
> > > >>>>> >> > > at any point of time in the future. Having said that, it
> > > sounds
> > > >>>>> to me
> > > >>>>> >> > like
> > > >>>>> >> > > your proposal is a bit bigger than it looks at the first
> > > glance
> > > >>>>> and
> > > >>>>> >> you
> > > >>>>> >> > > actually want to expose the operator coordinator concept
> to
> > > the
> > > >>>>> public
> > > >>>>> >> > API?
> > > >>>>> >> > >
> > > >>>>> >> > > AFAIK there were some discussions about that, and it was a
> > bit
> > > >>>>> of a
> > > >>>>> >> > > conscious decision to NOT do that. I don't know those
> > reasons
> > > >>>>> however.
> > > >>>>> >> > Only
> > > >>>>> >> > > now, I've just heard that there are for example some
> > problems
> > > >>>>> with
> > > >>>>> >> > > checkpointing of hypothetical non source operator
> > > coordinators.
> > > >>>>> Maybe
> > > >>>>> >> > > someone else could shed some light on this?
> > > >>>>> >> > >
> > > >>>>> >> > > Conceptually I would be actually in favour of exposing
> > > operator
> > > >>>>> >> > > coordinators if there is a good reason behind that, but it
> > is
> > > a
> > > >>>>> more
> > > >>>>> >> > > difficult topic and might be a larger effort than it seems
> > at
> > > >>>>> the
> > > >>>>> >> first
> > > >>>>> >> > > glance.
> > > >>>>> >> > >
> > > >>>>> >> > > Best,
> > > >>>>> >> > > Piotrek
> > > >>>>> >> > >
> > > >>>>> >> > > wt., 4 paź 2022 o 19:41 Steven Wu <st...@gmail.com>
> > > >>>>> napisał(a):
> > > >>>>> >> > >
> > > >>>>> >> > > > Jing, thanks a lot for your reply. The linked google doc
> > is
> > > >>>>> not for
> > > >>>>> >> > this
> > > >>>>> >> > > > FLIP, which is fully documented in the wiki page. The
> > linked
> > > >>>>> google
> > > >>>>> >> doc
> > > >>>>> >> > > is
> > > >>>>> >> > > > the design doc to introduce shuffling in Flink Iceberg
> > sink,
> > > >>>>> which
> > > >>>>> >> > > > motivated this FLIP proposal so that the shuffle
> > coordinator
> > > >>>>> can
> > > >>>>> >> > leverage
> > > >>>>> >> > > > the introduced BaseCoordinatorContext to avoid code
> > > >>>>> duplication.
> > > >>>>> >> > > >
> > > >>>>> >> > > > On Tue, Oct 4, 2022 at 1:04 AM Jing Ge <
> > jing@ververica.com>
> > > >>>>> wrote:
> > > >>>>> >> > > >
> > > >>>>> >> > > > > Thanks for bringing this up. It looks overall good!
> One
> > > >>>>> small
> > > >>>>> >> thing,
> > > >>>>> >> > > you
> > > >>>>> >> > > > > might want to write all content on the wiki page
> instead
> > > of
> > > >>>>> >> linking
> > > >>>>> >> > to
> > > >>>>> >> > > a
> > > >>>>> >> > > > > google doc. The reason is that some people might not
> be
> > > >>>>> able to
> > > >>>>> >> > access
> > > >>>>> >> > > > the
> > > >>>>> >> > > > > google doc.
> > > >>>>> >> > > > >
> > > >>>>> >> > > > > Best regards,
> > > >>>>> >> > > > > Jing
> > > >>>>> >> > > > >
> > > >>>>> >> > > > > On Tue, Oct 4, 2022 at 3:57 AM gang ye <
> > > yegangapp@gmail.com
> > > >>>>> >
> > > >>>>> >> wrote:
> > > >>>>> >> > > > >
> > > >>>>> >> > > > >> Hi,
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> We submit the Flip proposal
> > > >>>>> >> > > > >> <
> > > >>>>> >> > > > >>
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-264%3A+Extract+BaseCoordinatorContext
> > > >>>>> >> > > > >> >
> > > >>>>> >> > > > >> at Confluent to extract BaseCoordinatorContext from
> > > >>>>> >> > > > >> SourceCoordinatorContext to reuse it for other
> > > >>>>> coordinators E.g.
> > > >>>>> >> in
> > > >>>>> >> > > the
> > > >>>>> >> > > > >> shuffling support of Flink Iceberg sink
> > > >>>>> >> > > > >> <
> > > >>>>> >> > > > >>
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>>
> > >
> >
> https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
> > > >>>>> >> > > > >> >
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> Could you help to take a look?
> > > >>>>> >> > > > >> Thanks
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >> Gang
> > > >>>>> >> > > > >>
> > > >>>>> >> > > > >
> > > >>>>> >> > > >
> > > >>>>> >> > >
> > > >>>>> >> >
> > > >>>>> >>
> > > >>>>> >
> > > >>>>>
> > > >>>>
> > >
> >
>