You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2021/01/22 15:03:30 UTC

[DISCUSS] FLIP-160: Declarative scheduler

Hi everyone,

I would like to start a discussion about adding a new type of scheduler to
Flink. The declarative scheduler will first declare the required resources
and wait for them before deciding on the actual parallelism of a job.
Thereby it can better handle situations where resources cannot be fully
fulfilled. Moreover, it will act as a building block for the reactive mode
where Flink should scale to the maximum of the currently available
resources.

Please find more details in the FLIP wiki document [1]. Looking forward to
your feedback.

[1] https://cwiki.apache.org/confluence/x/mwtRCg

Cheers,
Till

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Till Rohrmann <tr...@apache.org>.
Thanks a lot for all your input. I have update the FLIP-160 with your
suggestions:

1) Add job configuration as a follow up
2) Pull out IO operations out of the ExecutionGraph if the failover becomes
too slow
3) Introduce a configuration parameter for the timeout in the "Waiting for
resources" state (coming from the FLIP-159 discussion)

Next, I will create the vote thread for this FLIP.

Cheers,
Till

On Fri, Jan 29, 2021 at 10:06 AM Chesnay Schepler <ch...@apache.org>
wrote:

> Yes, since we're only operating within the scheduler, which exists
> separately for each job, we don't have to worry about collisions with
> other jobs.
>
> On 1/27/2021 11:08 AM, Yangze Guo wrote:
> > Thanks for preparing the FLIP and driving the discussion, Till. All of
> > my questions have already been answered in the previous discussion.
> >
> > I just have one minor reminder regarding using ExecutionVertexID as
> > the identificator. The JobVertexID is generated according to the
> > topology instead of generated randomly. Thus, it's not guaranteed to
> > be unique across different jobs and different execution. This
> > characteristic is also inherited by the ExecutionVertexID. UUIC, the
> > ParallelismAndResourceAssignments is a job-level concept so it will
> > work well.
> >
> > Best,
> > Yangze Guo
> >
> > On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <to...@gmail.com>
> wrote:
> >> Thanks for the explanations, Till.
> >> Keeping the initial design as simple as possible sounds good to me.
> >> There's no further concern from my side.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <re...@gmail.com> wrote:
> >>
> >>> Thanks Till for the explanation and the follow up actions.
> >>> That sounds good to me.
> >>>
> >>> Thanks,
> >>> Zhu
> >>>
> >>> Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午7:28写道:
> >>>
> >>>> Thanks a lot for all your feedback. Let me try to answer it.
> >>>>
> >>>> # ScaleUpController vs. RescaleController
> >>>>
> >>>> At the moment, the idea of the declarative scheduler is to run a job
> with
> >>>> a parallelism which is as close to the target value as possible but to
> >>>> never exceed it. Since the target value is currently fixed (this will
> >>>> change once auto-scaling is supported), we never have to scale down.
> In
> >>>> fact, scale down events only happen if the system loses slots and then
> >>> you
> >>>> cannot choose whether to scale down or not. This and the idea to keep
> the
> >>>> initial design as simple as possible motivated the naming of
> >>>> ScaleUpController. Once we support auto-scaling, we might have to
> rename
> >>>> this interface. However, since I don't fully know how things will look
> >>> like
> >>>> with auto-scaling, I would like to refrain from this change now.
> >>>>
> >>>> # Relationship between declarative scheduler and existing
> implementations
> >>>>
> >>>> The declarative scheduler will implement the SchedulerNG interface. At
> >>> the
> >>>> moment I am not aware of any required changes to this interface. Also
> the
> >>>> way the JobMaster interacts with the scheduler won't change to the
> best
> >>> of
> >>>> my knowledge.
> >>>>
> >>>> # Awareness of SlotAllocator of the ExecutionGraph
> >>>>
> >>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph
> >>>> because it can only be created after the SlotAllocator has decided on
> the
> >>>> parallelism the job can be run with. Moreover, the idea is that the
> >>>> JobInformation instance contains all the required information of a
> job to
> >>>> make this decision.
> >>>>
> >>>> The SlotAllocator also reserves the slots for a job before the
> >>>> ExecutionGraph is created. Consequently, we need a way to associate
> the
> >>>> slots with the Executions of the ExecutionGraph. Here we have decided
> to
> >>>> use the ExecutionVertexID as the identificator. We could also
> introduce a
> >>>> new identificator as long as we can map this one to the
> >>> ExecutionVertexID.
> >>>> We could for example use JobVertexID and subtask id as the
> identificator
> >>>> but this is exactly what the ExecutionVertexID is. That's why we
> decided
> >>> to
> >>>> reuse it for the time being.
> >>>>
> >>>> # ScaleUpController
> >>>>
> >>>> ## Cumulative parallelism
> >>>>
> >>>> Yes, the cumulative parallelism is the sum of all tasks.
> >>>>
> >>>> ## When to call the ScaleUpController
> >>>>
> >>>> Yes, the idea is that the scheduler will check whether one can run the
> >>> job
> >>>> with an increased parallelism if there are new slots available. If
> this
> >>> is
> >>>> the case, then we will ask the ScaleUpController, whether we actually
> >>>> should scale up (e.g. whether the increase is significant enough or
> >>> whether
> >>>> enough time has passed between the last scale up operation).
> >>>>
> >>>> ## Do we provide enough information to the ScaleUpController
> >>>>
> >>>> I am pretty sure that we don't provide the ScaleUpController enough
> >>>> information to make the best possible decision. We wanted to keep it
> >>> simple
> >>>> in the first version and iterate on the interface with the help of
> user
> >>>> feedback. I think that this interface won't fundamentally change the
> >>>> scheduler and, hence, it shouldn't block future extensions by starting
> >>> with
> >>>> a simple interface.
> >>>>
> >>>> # Cluster vs. job configuration
> >>>>
> >>>> I think you are right that it would be most flexible if one could
> select
> >>> a
> >>>> scheduler on a per job basis with falling back to the cluster
> >>> configuration
> >>>> if nothing has been specified. For the sake of simplicity and
> narrowing
> >>>> down the scope I would consider this as a follow up.
> >>>>
> >>>> # Performance on failover
> >>>>
> >>>> I agree that the performance won't be optimal in the failover case
> >>> because
> >>>> of 1) having to recreate the EG and 2) redundant IO operations. For
> 1) I
> >>>> agree that FLINK-21110 will help a lot. For 2) moving the IO
> operations
> >>> out
> >>>> of the EG could be a good improvement. With the same argument as
> above, I
> >>>> would consider this a follow up because I would like to keep the
> initial
> >>>> design as simple as possible and I think that these performance
> >>>> optimizations are not required to make this feature work.
> >>>>
> >>>> # Lost execution history
> >>>>
> >>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the
> loss
> >>>> of information. We are currently investigating which information is
> >>>> strictly required and needs to be maintained across ExecutionGraph
> >>>> creations (mainly for tests to succeed). One idea is to either store
> this
> >>>> information outside of the ExecutionGraph or to initialize the
> >>>> ExecutionGraph with the information from the previous instance. Most
> >>>> likely, we won't give a guarantee that all counters, timestamps and
> >>> metrics
> >>>> are correctly maintained across failovers in the first version,
> though.
> >>> It
> >>>> is a bit the same argument as above that this is not strictly
> required to
> >>>> make this feature work. Maybe this means that this feature is not
> >>>> production ready for some users, but I think this is ok.
> >>>>
> >>>> In general, to fully integrate the declarative scheduler with the web
> ui
> >>>> we have to be able to display changing ExecutionGraphs. Ideally, we
> would
> >>>> have something like a timeline where one can select the time for
> which to
> >>>> display the state of the job execution. If one goes all the way to the
> >>>> right, the system shows the live state and all other positions are the
> >>>> present time minus some offset.
> >>>>
> >>>> I will add the follow ups to the FLIP as potential improvements in
> order
> >>>> to keep track of them.
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:
> >>>>
> >>>>> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
> >>>>>
> >>>>> The design looks generally good to me. I have 2 concerns though:
> >>>>>
> >>>>> # performance on failover
> >>>>> I can see is that an ExecutionGraph will be built and initialized on
> >>> each
> >>>>> task failure. The process is possibly to be slow:
> >>>>> 1. the topology building can be very slow (much slower than the
> >>>>> scheduling
> >>>>> or restarting process) when the job scale becomes large (see
> >>>>> FLINK-20612).
> >>>>> Luckily FLINK-21110 will improve it.
> >>>>> 2. the input/output format can be slow due to possible IO work to
> >>>>> external services.
> >>>>> Maybe we should extract it out from ExecutionGraph building?
> >>>>>
> >>>>> # execution history lost
> >>>>> After building and using a new ExecutionGraph, the execution history
> in
> >>>>> the
> >>>>> old graph will be lost, including status timestamps of the job, prior
> >>>>> execution
> >>>>> attempts and their failures. Should we let the new EG inherit these
> >>>>> information
> >>>>> from prior EG? Maybe as a future plan with further discussions
> regarding
> >>>>> the
> >>>>> varying parallelism.
> >>>>>
> >>>>> Thanks,
> >>>>> Zhu
> >>>>>
> >>>>> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
> >>>>>
> >>>>>> Thanks for preparing the FLIP and starting the discussion, Till.
> >>>>>>
> >>>>>> I have a few questions trying to understand the design.
> >>>>>>
> >>>>>> ## What is the relationship between the current and new schedulers?
> >>>>>> IIUC, the new declarative scheduler will coexist with the current
> >>>>>> scheduler, as an alternative that the user needs to explicitly
> switch
> >>> to.
> >>>>>> Then does it require any changes to the scheduler interfaces and how
> >>> the
> >>>>>> JobMaster interacts with it?
> >>>>>>
> >>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> >>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only
> >>> takes
> >>>>>> `JobInformation` and `VertexInformation` as topology inputs.
> However,
> >>> it
> >>>>>> generates `ParallelismAndResourceAssignments` which maps slots to
> >>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> >>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
> >>>>>> generated
> >>>>>> inside `SlotAllocator`.
> >>>>>>
> >>>>>> ## About `ScaleUpController#canScaleUp`
> >>>>>>
> >>>>>> ### What is cumulative parallelism?
> >>>>>> The interface shows that the cumulative parallelism is a single
> number
> >>>>>> per
> >>>>>> job graph. I assume this is the sum of parallelism of all vertices?
> >>>>>>
> >>>>>> ### Is this interface expected to be called before or after
> >>>>>> `SlotAllocator#determineParallelism`?
> >>>>>> IIUC, on new resources appear, the scheduler always calls
> >>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on
> >>> the
> >>>>>> available resources, and `ScaleUpController#canScaleUp` is then
> called
> >>> to
> >>>>>> decide whether to apply the new plan, according to whether the
> >>>>>> increasement
> >>>>>> is significant, how long the job has been running since last
> restart,
> >>>>>> etc.
> >>>>>> Is that correct?
> >>>>>>
> >>>>>> ### The cumulative parallelism may not be enough for deciding
> whether
> >>> the
> >>>>>> job should be scaled up.
> >>>>>> I'm assuming my understanding for the above questions are correct.
> >>> Please
> >>>>>> correct me if not.
> >>>>>>
> >>>>>> I noticed Chesnay's comment on the wiki page about making the
> decision
> >>>>>> based on when the job entered the execution state last time.
> >>>>>>
> >>>>>> In addition to that, there could also be situations where jobs may
> not
> >>>>>> benefit much an increment in cumulative parallelism.
> >>>>>> E.g., for a topology A -> B, where A and B are in different slot
> >>> sharing
> >>>>>> groups, and the current parallelism for both A and B are 2. When 1
> new
> >>>>>> slot
> >>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to
> 3.
> >>>>>> But
> >>>>>> this may not be a significant beneficial change for the job because
> the
> >>>>>> overall performance is still bottlenecked by the parallelism of B.
> >>>>>>
> >>>>>> ## Cluster vs. Job configuration
> >>>>>> I'm not entirely sure about specifying which scheduler to be used
> via a
> >>>>>> cluster level configuration option. Each job in a Flink cluster has
> its
> >>>>>> own
> >>>>>> JobMaster, and which scheduler to use is internal to that
> JobMaster. I
> >>>>>> understand that the declarative scheduler requires the cluster to
> use
> >>>>>> declarative resource management. However, other jobs should still be
> >>>>>> allowed to use other scheduler implementations that also support
> >>>>>> declarative resource management (e.g. the current
> `DefaultScheduler`).
> >>>>>> Maybe we should consider the cluster level configuration option as a
> >>>>>> default scheduler, and allow the job to specify a different
> scheduler
> >>> in
> >>>>>> its execution config. This is similar to how we specify which state
> >>>>>> backend
> >>>>>> to be used.
> >>>>>>
> >>>>>> ## Minor: It might be better to also show in the state machine
> figure
> >>>>>> that
> >>>>>> it can go from `Executing` to `Restarting` when new resources
> appear.
> >>>>>>
> >>>>>> Thank you~
> >>>>>>
> >>>>>> Xintong Song
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com>
> >>> wrote:
> >>>>>>> Till, thanks a lot for the proposal.
> >>>>>>>
> >>>>>>> Even if the initial phase is only to support scale-up, maybe the
> >>>>>>> "ScaleUpController" interface should be called "RescaleController"
> so
> >>>>>> that
> >>>>>>> in the future scale-down can be added.
> >>>>>>>
> >>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <
> trohrmann@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> I would like to start a discussion about adding a new type of
> >>>>>> scheduler
> >>>>>>> to
> >>>>>>>> Flink. The declarative scheduler will first declare the required
> >>>>>>> resources
> >>>>>>>> and wait for them before deciding on the actual parallelism of a
> >>> job.
> >>>>>>>> Thereby it can better handle situations where resources cannot be
> >>>>>> fully
> >>>>>>>> fulfilled. Moreover, it will act as a building block for the
> >>> reactive
> >>>>>>> mode
> >>>>>>>> where Flink should scale to the maximum of the currently available
> >>>>>>>> resources.
> >>>>>>>>
> >>>>>>>> Please find more details in the FLIP wiki document [1]. Looking
> >>>>>> forward
> >>>>>>> to
> >>>>>>>> your feedback.
> >>>>>>>>
> >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Till
> >>>>>>>>
>
>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Chesnay Schepler <ch...@apache.org>.
Yes, since we're only operating within the scheduler, which exists 
separately for each job, we don't have to worry about collisions with 
other jobs.

On 1/27/2021 11:08 AM, Yangze Guo wrote:
> Thanks for preparing the FLIP and driving the discussion, Till. All of
> my questions have already been answered in the previous discussion.
>
> I just have one minor reminder regarding using ExecutionVertexID as
> the identificator. The JobVertexID is generated according to the
> topology instead of generated randomly. Thus, it's not guaranteed to
> be unique across different jobs and different execution. This
> characteristic is also inherited by the ExecutionVertexID. UUIC, the
> ParallelismAndResourceAssignments is a job-level concept so it will
> work well.
>
> Best,
> Yangze Guo
>
> On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <to...@gmail.com> wrote:
>> Thanks for the explanations, Till.
>> Keeping the initial design as simple as possible sounds good to me.
>> There's no further concern from my side.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <re...@gmail.com> wrote:
>>
>>> Thanks Till for the explanation and the follow up actions.
>>> That sounds good to me.
>>>
>>> Thanks,
>>> Zhu
>>>
>>> Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午7:28写道:
>>>
>>>> Thanks a lot for all your feedback. Let me try to answer it.
>>>>
>>>> # ScaleUpController vs. RescaleController
>>>>
>>>> At the moment, the idea of the declarative scheduler is to run a job with
>>>> a parallelism which is as close to the target value as possible but to
>>>> never exceed it. Since the target value is currently fixed (this will
>>>> change once auto-scaling is supported), we never have to scale down. In
>>>> fact, scale down events only happen if the system loses slots and then
>>> you
>>>> cannot choose whether to scale down or not. This and the idea to keep the
>>>> initial design as simple as possible motivated the naming of
>>>> ScaleUpController. Once we support auto-scaling, we might have to rename
>>>> this interface. However, since I don't fully know how things will look
>>> like
>>>> with auto-scaling, I would like to refrain from this change now.
>>>>
>>>> # Relationship between declarative scheduler and existing implementations
>>>>
>>>> The declarative scheduler will implement the SchedulerNG interface. At
>>> the
>>>> moment I am not aware of any required changes to this interface. Also the
>>>> way the JobMaster interacts with the scheduler won't change to the best
>>> of
>>>> my knowledge.
>>>>
>>>> # Awareness of SlotAllocator of the ExecutionGraph
>>>>
>>>> The idea is to make the SlotAllocator not aware of the ExecutionGraph
>>>> because it can only be created after the SlotAllocator has decided on the
>>>> parallelism the job can be run with. Moreover, the idea is that the
>>>> JobInformation instance contains all the required information of a job to
>>>> make this decision.
>>>>
>>>> The SlotAllocator also reserves the slots for a job before the
>>>> ExecutionGraph is created. Consequently, we need a way to associate the
>>>> slots with the Executions of the ExecutionGraph. Here we have decided to
>>>> use the ExecutionVertexID as the identificator. We could also introduce a
>>>> new identificator as long as we can map this one to the
>>> ExecutionVertexID.
>>>> We could for example use JobVertexID and subtask id as the identificator
>>>> but this is exactly what the ExecutionVertexID is. That's why we decided
>>> to
>>>> reuse it for the time being.
>>>>
>>>> # ScaleUpController
>>>>
>>>> ## Cumulative parallelism
>>>>
>>>> Yes, the cumulative parallelism is the sum of all tasks.
>>>>
>>>> ## When to call the ScaleUpController
>>>>
>>>> Yes, the idea is that the scheduler will check whether one can run the
>>> job
>>>> with an increased parallelism if there are new slots available. If this
>>> is
>>>> the case, then we will ask the ScaleUpController, whether we actually
>>>> should scale up (e.g. whether the increase is significant enough or
>>> whether
>>>> enough time has passed between the last scale up operation).
>>>>
>>>> ## Do we provide enough information to the ScaleUpController
>>>>
>>>> I am pretty sure that we don't provide the ScaleUpController enough
>>>> information to make the best possible decision. We wanted to keep it
>>> simple
>>>> in the first version and iterate on the interface with the help of user
>>>> feedback. I think that this interface won't fundamentally change the
>>>> scheduler and, hence, it shouldn't block future extensions by starting
>>> with
>>>> a simple interface.
>>>>
>>>> # Cluster vs. job configuration
>>>>
>>>> I think you are right that it would be most flexible if one could select
>>> a
>>>> scheduler on a per job basis with falling back to the cluster
>>> configuration
>>>> if nothing has been specified. For the sake of simplicity and narrowing
>>>> down the scope I would consider this as a follow up.
>>>>
>>>> # Performance on failover
>>>>
>>>> I agree that the performance won't be optimal in the failover case
>>> because
>>>> of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
>>>> agree that FLINK-21110 will help a lot. For 2) moving the IO operations
>>> out
>>>> of the EG could be a good improvement. With the same argument as above, I
>>>> would consider this a follow up because I would like to keep the initial
>>>> design as simple as possible and I think that these performance
>>>> optimizations are not required to make this feature work.
>>>>
>>>> # Lost execution history
>>>>
>>>> You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
>>>> of information. We are currently investigating which information is
>>>> strictly required and needs to be maintained across ExecutionGraph
>>>> creations (mainly for tests to succeed). One idea is to either store this
>>>> information outside of the ExecutionGraph or to initialize the
>>>> ExecutionGraph with the information from the previous instance. Most
>>>> likely, we won't give a guarantee that all counters, timestamps and
>>> metrics
>>>> are correctly maintained across failovers in the first version, though.
>>> It
>>>> is a bit the same argument as above that this is not strictly required to
>>>> make this feature work. Maybe this means that this feature is not
>>>> production ready for some users, but I think this is ok.
>>>>
>>>> In general, to fully integrate the declarative scheduler with the web ui
>>>> we have to be able to display changing ExecutionGraphs. Ideally, we would
>>>> have something like a timeline where one can select the time for which to
>>>> display the state of the job execution. If one goes all the way to the
>>>> right, the system shows the live state and all other positions are the
>>>> present time minus some offset.
>>>>
>>>> I will add the follow ups to the FLIP as potential improvements in order
>>>> to keep track of them.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:
>>>>
>>>>> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
>>>>>
>>>>> The design looks generally good to me. I have 2 concerns though:
>>>>>
>>>>> # performance on failover
>>>>> I can see is that an ExecutionGraph will be built and initialized on
>>> each
>>>>> task failure. The process is possibly to be slow:
>>>>> 1. the topology building can be very slow (much slower than the
>>>>> scheduling
>>>>> or restarting process) when the job scale becomes large (see
>>>>> FLINK-20612).
>>>>> Luckily FLINK-21110 will improve it.
>>>>> 2. the input/output format can be slow due to possible IO work to
>>>>> external services.
>>>>> Maybe we should extract it out from ExecutionGraph building?
>>>>>
>>>>> # execution history lost
>>>>> After building and using a new ExecutionGraph, the execution history in
>>>>> the
>>>>> old graph will be lost, including status timestamps of the job, prior
>>>>> execution
>>>>> attempts and their failures. Should we let the new EG inherit these
>>>>> information
>>>>> from prior EG? Maybe as a future plan with further discussions regarding
>>>>> the
>>>>> varying parallelism.
>>>>>
>>>>> Thanks,
>>>>> Zhu
>>>>>
>>>>> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
>>>>>
>>>>>> Thanks for preparing the FLIP and starting the discussion, Till.
>>>>>>
>>>>>> I have a few questions trying to understand the design.
>>>>>>
>>>>>> ## What is the relationship between the current and new schedulers?
>>>>>> IIUC, the new declarative scheduler will coexist with the current
>>>>>> scheduler, as an alternative that the user needs to explicitly switch
>>> to.
>>>>>> Then does it require any changes to the scheduler interfaces and how
>>> the
>>>>>> JobMaster interacts with it?
>>>>>>
>>>>>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
>>>>>> Looking at the interfaces, it seems to me that `SlotAllocator` only
>>> takes
>>>>>> `JobInformation` and `VertexInformation` as topology inputs. However,
>>> it
>>>>>> generates `ParallelismAndResourceAssignments` which maps slots to
>>>>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
>>>>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
>>>>>> generated
>>>>>> inside `SlotAllocator`.
>>>>>>
>>>>>> ## About `ScaleUpController#canScaleUp`
>>>>>>
>>>>>> ### What is cumulative parallelism?
>>>>>> The interface shows that the cumulative parallelism is a single number
>>>>>> per
>>>>>> job graph. I assume this is the sum of parallelism of all vertices?
>>>>>>
>>>>>> ### Is this interface expected to be called before or after
>>>>>> `SlotAllocator#determineParallelism`?
>>>>>> IIUC, on new resources appear, the scheduler always calls
>>>>>> `SlotAllocator#determineParallelism` to generate a new plan based on
>>> the
>>>>>> available resources, and `ScaleUpController#canScaleUp` is then called
>>> to
>>>>>> decide whether to apply the new plan, according to whether the
>>>>>> increasement
>>>>>> is significant, how long the job has been running since last restart,
>>>>>> etc.
>>>>>> Is that correct?
>>>>>>
>>>>>> ### The cumulative parallelism may not be enough for deciding whether
>>> the
>>>>>> job should be scaled up.
>>>>>> I'm assuming my understanding for the above questions are correct.
>>> Please
>>>>>> correct me if not.
>>>>>>
>>>>>> I noticed Chesnay's comment on the wiki page about making the decision
>>>>>> based on when the job entered the execution state last time.
>>>>>>
>>>>>> In addition to that, there could also be situations where jobs may not
>>>>>> benefit much an increment in cumulative parallelism.
>>>>>> E.g., for a topology A -> B, where A and B are in different slot
>>> sharing
>>>>>> groups, and the current parallelism for both A and B are 2. When 1 new
>>>>>> slot
>>>>>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
>>>>>> But
>>>>>> this may not be a significant beneficial change for the job because the
>>>>>> overall performance is still bottlenecked by the parallelism of B.
>>>>>>
>>>>>> ## Cluster vs. Job configuration
>>>>>> I'm not entirely sure about specifying which scheduler to be used via a
>>>>>> cluster level configuration option. Each job in a Flink cluster has its
>>>>>> own
>>>>>> JobMaster, and which scheduler to use is internal to that JobMaster. I
>>>>>> understand that the declarative scheduler requires the cluster to use
>>>>>> declarative resource management. However, other jobs should still be
>>>>>> allowed to use other scheduler implementations that also support
>>>>>> declarative resource management (e.g. the current `DefaultScheduler`).
>>>>>> Maybe we should consider the cluster level configuration option as a
>>>>>> default scheduler, and allow the job to specify a different scheduler
>>> in
>>>>>> its execution config. This is similar to how we specify which state
>>>>>> backend
>>>>>> to be used.
>>>>>>
>>>>>> ## Minor: It might be better to also show in the state machine figure
>>>>>> that
>>>>>> it can go from `Executing` to `Restarting` when new resources appear.
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com>
>>> wrote:
>>>>>>> Till, thanks a lot for the proposal.
>>>>>>>
>>>>>>> Even if the initial phase is only to support scale-up, maybe the
>>>>>>> "ScaleUpController" interface should be called "RescaleController" so
>>>>>> that
>>>>>>> in the future scale-down can be added.
>>>>>>>
>>>>>>> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>>
>>>>>>>> I would like to start a discussion about adding a new type of
>>>>>> scheduler
>>>>>>> to
>>>>>>>> Flink. The declarative scheduler will first declare the required
>>>>>>> resources
>>>>>>>> and wait for them before deciding on the actual parallelism of a
>>> job.
>>>>>>>> Thereby it can better handle situations where resources cannot be
>>>>>> fully
>>>>>>>> fulfilled. Moreover, it will act as a building block for the
>>> reactive
>>>>>>> mode
>>>>>>>> where Flink should scale to the maximum of the currently available
>>>>>>>> resources.
>>>>>>>>
>>>>>>>> Please find more details in the FLIP wiki document [1]. Looking
>>>>>> forward
>>>>>>> to
>>>>>>>> your feedback.
>>>>>>>>
>>>>>>>> [1] https://cwiki.apache.org/confluence/x/mwtRCg
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>


Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Yangze Guo <ka...@gmail.com>.
Thanks for preparing the FLIP and driving the discussion, Till. All of
my questions have already been answered in the previous discussion.

I just have one minor reminder regarding using ExecutionVertexID as
the identificator. The JobVertexID is generated according to the
topology instead of generated randomly. Thus, it's not guaranteed to
be unique across different jobs and different execution. This
characteristic is also inherited by the ExecutionVertexID. UUIC, the
ParallelismAndResourceAssignments is a job-level concept so it will
work well.

Best,
Yangze Guo

On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <to...@gmail.com> wrote:
>
> Thanks for the explanations, Till.
> Keeping the initial design as simple as possible sounds good to me.
> There's no further concern from my side.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <re...@gmail.com> wrote:
>
> > Thanks Till for the explanation and the follow up actions.
> > That sounds good to me.
> >
> > Thanks,
> > Zhu
> >
> > Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午7:28写道:
> >
> > > Thanks a lot for all your feedback. Let me try to answer it.
> > >
> > > # ScaleUpController vs. RescaleController
> > >
> > > At the moment, the idea of the declarative scheduler is to run a job with
> > > a parallelism which is as close to the target value as possible but to
> > > never exceed it. Since the target value is currently fixed (this will
> > > change once auto-scaling is supported), we never have to scale down. In
> > > fact, scale down events only happen if the system loses slots and then
> > you
> > > cannot choose whether to scale down or not. This and the idea to keep the
> > > initial design as simple as possible motivated the naming of
> > > ScaleUpController. Once we support auto-scaling, we might have to rename
> > > this interface. However, since I don't fully know how things will look
> > like
> > > with auto-scaling, I would like to refrain from this change now.
> > >
> > > # Relationship between declarative scheduler and existing implementations
> > >
> > > The declarative scheduler will implement the SchedulerNG interface. At
> > the
> > > moment I am not aware of any required changes to this interface. Also the
> > > way the JobMaster interacts with the scheduler won't change to the best
> > of
> > > my knowledge.
> > >
> > > # Awareness of SlotAllocator of the ExecutionGraph
> > >
> > > The idea is to make the SlotAllocator not aware of the ExecutionGraph
> > > because it can only be created after the SlotAllocator has decided on the
> > > parallelism the job can be run with. Moreover, the idea is that the
> > > JobInformation instance contains all the required information of a job to
> > > make this decision.
> > >
> > > The SlotAllocator also reserves the slots for a job before the
> > > ExecutionGraph is created. Consequently, we need a way to associate the
> > > slots with the Executions of the ExecutionGraph. Here we have decided to
> > > use the ExecutionVertexID as the identificator. We could also introduce a
> > > new identificator as long as we can map this one to the
> > ExecutionVertexID.
> > > We could for example use JobVertexID and subtask id as the identificator
> > > but this is exactly what the ExecutionVertexID is. That's why we decided
> > to
> > > reuse it for the time being.
> > >
> > > # ScaleUpController
> > >
> > > ## Cumulative parallelism
> > >
> > > Yes, the cumulative parallelism is the sum of all tasks.
> > >
> > > ## When to call the ScaleUpController
> > >
> > > Yes, the idea is that the scheduler will check whether one can run the
> > job
> > > with an increased parallelism if there are new slots available. If this
> > is
> > > the case, then we will ask the ScaleUpController, whether we actually
> > > should scale up (e.g. whether the increase is significant enough or
> > whether
> > > enough time has passed between the last scale up operation).
> > >
> > > ## Do we provide enough information to the ScaleUpController
> > >
> > > I am pretty sure that we don't provide the ScaleUpController enough
> > > information to make the best possible decision. We wanted to keep it
> > simple
> > > in the first version and iterate on the interface with the help of user
> > > feedback. I think that this interface won't fundamentally change the
> > > scheduler and, hence, it shouldn't block future extensions by starting
> > with
> > > a simple interface.
> > >
> > > # Cluster vs. job configuration
> > >
> > > I think you are right that it would be most flexible if one could select
> > a
> > > scheduler on a per job basis with falling back to the cluster
> > configuration
> > > if nothing has been specified. For the sake of simplicity and narrowing
> > > down the scope I would consider this as a follow up.
> > >
> > > # Performance on failover
> > >
> > > I agree that the performance won't be optimal in the failover case
> > because
> > > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
> > > agree that FLINK-21110 will help a lot. For 2) moving the IO operations
> > out
> > > of the EG could be a good improvement. With the same argument as above, I
> > > would consider this a follow up because I would like to keep the initial
> > > design as simple as possible and I think that these performance
> > > optimizations are not required to make this feature work.
> > >
> > > # Lost execution history
> > >
> > > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
> > > of information. We are currently investigating which information is
> > > strictly required and needs to be maintained across ExecutionGraph
> > > creations (mainly for tests to succeed). One idea is to either store this
> > > information outside of the ExecutionGraph or to initialize the
> > > ExecutionGraph with the information from the previous instance. Most
> > > likely, we won't give a guarantee that all counters, timestamps and
> > metrics
> > > are correctly maintained across failovers in the first version, though.
> > It
> > > is a bit the same argument as above that this is not strictly required to
> > > make this feature work. Maybe this means that this feature is not
> > > production ready for some users, but I think this is ok.
> > >
> > > In general, to fully integrate the declarative scheduler with the web ui
> > > we have to be able to display changing ExecutionGraphs. Ideally, we would
> > > have something like a timeline where one can select the time for which to
> > > display the state of the job execution. If one goes all the way to the
> > > right, the system shows the live state and all other positions are the
> > > present time minus some offset.
> > >
> > > I will add the follow ups to the FLIP as potential improvements in order
> > > to keep track of them.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:
> > >
> > >> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
> > >>
> > >> The design looks generally good to me. I have 2 concerns though:
> > >>
> > >> # performance on failover
> > >> I can see is that an ExecutionGraph will be built and initialized on
> > each
> > >> task failure. The process is possibly to be slow:
> > >> 1. the topology building can be very slow (much slower than the
> > >> scheduling
> > >> or restarting process) when the job scale becomes large (see
> > >> FLINK-20612).
> > >> Luckily FLINK-21110 will improve it.
> > >> 2. the input/output format can be slow due to possible IO work to
> > >> external services.
> > >> Maybe we should extract it out from ExecutionGraph building?
> > >>
> > >> # execution history lost
> > >> After building and using a new ExecutionGraph, the execution history in
> > >> the
> > >> old graph will be lost, including status timestamps of the job, prior
> > >> execution
> > >> attempts and their failures. Should we let the new EG inherit these
> > >> information
> > >> from prior EG? Maybe as a future plan with further discussions regarding
> > >> the
> > >> varying parallelism.
> > >>
> > >> Thanks,
> > >> Zhu
> > >>
> > >> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
> > >>
> > >>> Thanks for preparing the FLIP and starting the discussion, Till.
> > >>>
> > >>> I have a few questions trying to understand the design.
> > >>>
> > >>> ## What is the relationship between the current and new schedulers?
> > >>> IIUC, the new declarative scheduler will coexist with the current
> > >>> scheduler, as an alternative that the user needs to explicitly switch
> > to.
> > >>> Then does it require any changes to the scheduler interfaces and how
> > the
> > >>> JobMaster interacts with it?
> > >>>
> > >>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> > >>> Looking at the interfaces, it seems to me that `SlotAllocator` only
> > takes
> > >>> `JobInformation` and `VertexInformation` as topology inputs. However,
> > it
> > >>> generates `ParallelismAndResourceAssignments` which maps slots to
> > >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> > >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
> > >>> generated
> > >>> inside `SlotAllocator`.
> > >>>
> > >>> ## About `ScaleUpController#canScaleUp`
> > >>>
> > >>> ### What is cumulative parallelism?
> > >>> The interface shows that the cumulative parallelism is a single number
> > >>> per
> > >>> job graph. I assume this is the sum of parallelism of all vertices?
> > >>>
> > >>> ### Is this interface expected to be called before or after
> > >>> `SlotAllocator#determineParallelism`?
> > >>> IIUC, on new resources appear, the scheduler always calls
> > >>> `SlotAllocator#determineParallelism` to generate a new plan based on
> > the
> > >>> available resources, and `ScaleUpController#canScaleUp` is then called
> > to
> > >>> decide whether to apply the new plan, according to whether the
> > >>> increasement
> > >>> is significant, how long the job has been running since last restart,
> > >>> etc.
> > >>> Is that correct?
> > >>>
> > >>> ### The cumulative parallelism may not be enough for deciding whether
> > the
> > >>> job should be scaled up.
> > >>> I'm assuming my understanding for the above questions are correct.
> > Please
> > >>> correct me if not.
> > >>>
> > >>> I noticed Chesnay's comment on the wiki page about making the decision
> > >>> based on when the job entered the execution state last time.
> > >>>
> > >>> In addition to that, there could also be situations where jobs may not
> > >>> benefit much an increment in cumulative parallelism.
> > >>> E.g., for a topology A -> B, where A and B are in different slot
> > sharing
> > >>> groups, and the current parallelism for both A and B are 2. When 1 new
> > >>> slot
> > >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
> > >>> But
> > >>> this may not be a significant beneficial change for the job because the
> > >>> overall performance is still bottlenecked by the parallelism of B.
> > >>>
> > >>> ## Cluster vs. Job configuration
> > >>> I'm not entirely sure about specifying which scheduler to be used via a
> > >>> cluster level configuration option. Each job in a Flink cluster has its
> > >>> own
> > >>> JobMaster, and which scheduler to use is internal to that JobMaster. I
> > >>> understand that the declarative scheduler requires the cluster to use
> > >>> declarative resource management. However, other jobs should still be
> > >>> allowed to use other scheduler implementations that also support
> > >>> declarative resource management (e.g. the current `DefaultScheduler`).
> > >>> Maybe we should consider the cluster level configuration option as a
> > >>> default scheduler, and allow the job to specify a different scheduler
> > in
> > >>> its execution config. This is similar to how we specify which state
> > >>> backend
> > >>> to be used.
> > >>>
> > >>> ## Minor: It might be better to also show in the state machine figure
> > >>> that
> > >>> it can go from `Executing` to `Restarting` when new resources appear.
> > >>>
> > >>> Thank you~
> > >>>
> > >>> Xintong Song
> > >>>
> > >>>
> > >>>
> > >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com>
> > wrote:
> > >>>
> > >>> > Till, thanks a lot for the proposal.
> > >>> >
> > >>> > Even if the initial phase is only to support scale-up, maybe the
> > >>> > "ScaleUpController" interface should be called "RescaleController" so
> > >>> that
> > >>> > in the future scale-down can be added.
> > >>> >
> > >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
> > >>> > wrote:
> > >>> >
> > >>> > > Hi everyone,
> > >>> > >
> > >>> > > I would like to start a discussion about adding a new type of
> > >>> scheduler
> > >>> > to
> > >>> > > Flink. The declarative scheduler will first declare the required
> > >>> > resources
> > >>> > > and wait for them before deciding on the actual parallelism of a
> > job.
> > >>> > > Thereby it can better handle situations where resources cannot be
> > >>> fully
> > >>> > > fulfilled. Moreover, it will act as a building block for the
> > reactive
> > >>> > mode
> > >>> > > where Flink should scale to the maximum of the currently available
> > >>> > > resources.
> > >>> > >
> > >>> > > Please find more details in the FLIP wiki document [1]. Looking
> > >>> forward
> > >>> > to
> > >>> > > your feedback.
> > >>> > >
> > >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
> > >>> > >
> > >>> > > Cheers,
> > >>> > > Till
> > >>> > >
> > >>> >
> > >>>
> > >>
> >

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Xintong Song <to...@gmail.com>.
Thanks for the explanations, Till.
Keeping the initial design as simple as possible sounds good to me.
There's no further concern from my side.

Thank you~

Xintong Song



On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <re...@gmail.com> wrote:

> Thanks Till for the explanation and the follow up actions.
> That sounds good to me.
>
> Thanks,
> Zhu
>
> Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午7:28写道:
>
> > Thanks a lot for all your feedback. Let me try to answer it.
> >
> > # ScaleUpController vs. RescaleController
> >
> > At the moment, the idea of the declarative scheduler is to run a job with
> > a parallelism which is as close to the target value as possible but to
> > never exceed it. Since the target value is currently fixed (this will
> > change once auto-scaling is supported), we never have to scale down. In
> > fact, scale down events only happen if the system loses slots and then
> you
> > cannot choose whether to scale down or not. This and the idea to keep the
> > initial design as simple as possible motivated the naming of
> > ScaleUpController. Once we support auto-scaling, we might have to rename
> > this interface. However, since I don't fully know how things will look
> like
> > with auto-scaling, I would like to refrain from this change now.
> >
> > # Relationship between declarative scheduler and existing implementations
> >
> > The declarative scheduler will implement the SchedulerNG interface. At
> the
> > moment I am not aware of any required changes to this interface. Also the
> > way the JobMaster interacts with the scheduler won't change to the best
> of
> > my knowledge.
> >
> > # Awareness of SlotAllocator of the ExecutionGraph
> >
> > The idea is to make the SlotAllocator not aware of the ExecutionGraph
> > because it can only be created after the SlotAllocator has decided on the
> > parallelism the job can be run with. Moreover, the idea is that the
> > JobInformation instance contains all the required information of a job to
> > make this decision.
> >
> > The SlotAllocator also reserves the slots for a job before the
> > ExecutionGraph is created. Consequently, we need a way to associate the
> > slots with the Executions of the ExecutionGraph. Here we have decided to
> > use the ExecutionVertexID as the identificator. We could also introduce a
> > new identificator as long as we can map this one to the
> ExecutionVertexID.
> > We could for example use JobVertexID and subtask id as the identificator
> > but this is exactly what the ExecutionVertexID is. That's why we decided
> to
> > reuse it for the time being.
> >
> > # ScaleUpController
> >
> > ## Cumulative parallelism
> >
> > Yes, the cumulative parallelism is the sum of all tasks.
> >
> > ## When to call the ScaleUpController
> >
> > Yes, the idea is that the scheduler will check whether one can run the
> job
> > with an increased parallelism if there are new slots available. If this
> is
> > the case, then we will ask the ScaleUpController, whether we actually
> > should scale up (e.g. whether the increase is significant enough or
> whether
> > enough time has passed between the last scale up operation).
> >
> > ## Do we provide enough information to the ScaleUpController
> >
> > I am pretty sure that we don't provide the ScaleUpController enough
> > information to make the best possible decision. We wanted to keep it
> simple
> > in the first version and iterate on the interface with the help of user
> > feedback. I think that this interface won't fundamentally change the
> > scheduler and, hence, it shouldn't block future extensions by starting
> with
> > a simple interface.
> >
> > # Cluster vs. job configuration
> >
> > I think you are right that it would be most flexible if one could select
> a
> > scheduler on a per job basis with falling back to the cluster
> configuration
> > if nothing has been specified. For the sake of simplicity and narrowing
> > down the scope I would consider this as a follow up.
> >
> > # Performance on failover
> >
> > I agree that the performance won't be optimal in the failover case
> because
> > of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
> > agree that FLINK-21110 will help a lot. For 2) moving the IO operations
> out
> > of the EG could be a good improvement. With the same argument as above, I
> > would consider this a follow up because I would like to keep the initial
> > design as simple as possible and I think that these performance
> > optimizations are not required to make this feature work.
> >
> > # Lost execution history
> >
> > You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
> > of information. We are currently investigating which information is
> > strictly required and needs to be maintained across ExecutionGraph
> > creations (mainly for tests to succeed). One idea is to either store this
> > information outside of the ExecutionGraph or to initialize the
> > ExecutionGraph with the information from the previous instance. Most
> > likely, we won't give a guarantee that all counters, timestamps and
> metrics
> > are correctly maintained across failovers in the first version, though.
> It
> > is a bit the same argument as above that this is not strictly required to
> > make this feature work. Maybe this means that this feature is not
> > production ready for some users, but I think this is ok.
> >
> > In general, to fully integrate the declarative scheduler with the web ui
> > we have to be able to display changing ExecutionGraphs. Ideally, we would
> > have something like a timeline where one can select the time for which to
> > display the state of the job execution. If one goes all the way to the
> > right, the system shows the live state and all other positions are the
> > present time minus some offset.
> >
> > I will add the follow ups to the FLIP as potential improvements in order
> > to keep track of them.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:
> >
> >> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
> >>
> >> The design looks generally good to me. I have 2 concerns though:
> >>
> >> # performance on failover
> >> I can see is that an ExecutionGraph will be built and initialized on
> each
> >> task failure. The process is possibly to be slow:
> >> 1. the topology building can be very slow (much slower than the
> >> scheduling
> >> or restarting process) when the job scale becomes large (see
> >> FLINK-20612).
> >> Luckily FLINK-21110 will improve it.
> >> 2. the input/output format can be slow due to possible IO work to
> >> external services.
> >> Maybe we should extract it out from ExecutionGraph building?
> >>
> >> # execution history lost
> >> After building and using a new ExecutionGraph, the execution history in
> >> the
> >> old graph will be lost, including status timestamps of the job, prior
> >> execution
> >> attempts and their failures. Should we let the new EG inherit these
> >> information
> >> from prior EG? Maybe as a future plan with further discussions regarding
> >> the
> >> varying parallelism.
> >>
> >> Thanks,
> >> Zhu
> >>
> >> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
> >>
> >>> Thanks for preparing the FLIP and starting the discussion, Till.
> >>>
> >>> I have a few questions trying to understand the design.
> >>>
> >>> ## What is the relationship between the current and new schedulers?
> >>> IIUC, the new declarative scheduler will coexist with the current
> >>> scheduler, as an alternative that the user needs to explicitly switch
> to.
> >>> Then does it require any changes to the scheduler interfaces and how
> the
> >>> JobMaster interacts with it?
> >>>
> >>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> >>> Looking at the interfaces, it seems to me that `SlotAllocator` only
> takes
> >>> `JobInformation` and `VertexInformation` as topology inputs. However,
> it
> >>> generates `ParallelismAndResourceAssignments` which maps slots to
> >>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> >>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
> >>> generated
> >>> inside `SlotAllocator`.
> >>>
> >>> ## About `ScaleUpController#canScaleUp`
> >>>
> >>> ### What is cumulative parallelism?
> >>> The interface shows that the cumulative parallelism is a single number
> >>> per
> >>> job graph. I assume this is the sum of parallelism of all vertices?
> >>>
> >>> ### Is this interface expected to be called before or after
> >>> `SlotAllocator#determineParallelism`?
> >>> IIUC, on new resources appear, the scheduler always calls
> >>> `SlotAllocator#determineParallelism` to generate a new plan based on
> the
> >>> available resources, and `ScaleUpController#canScaleUp` is then called
> to
> >>> decide whether to apply the new plan, according to whether the
> >>> increasement
> >>> is significant, how long the job has been running since last restart,
> >>> etc.
> >>> Is that correct?
> >>>
> >>> ### The cumulative parallelism may not be enough for deciding whether
> the
> >>> job should be scaled up.
> >>> I'm assuming my understanding for the above questions are correct.
> Please
> >>> correct me if not.
> >>>
> >>> I noticed Chesnay's comment on the wiki page about making the decision
> >>> based on when the job entered the execution state last time.
> >>>
> >>> In addition to that, there could also be situations where jobs may not
> >>> benefit much an increment in cumulative parallelism.
> >>> E.g., for a topology A -> B, where A and B are in different slot
> sharing
> >>> groups, and the current parallelism for both A and B are 2. When 1 new
> >>> slot
> >>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
> >>> But
> >>> this may not be a significant beneficial change for the job because the
> >>> overall performance is still bottlenecked by the parallelism of B.
> >>>
> >>> ## Cluster vs. Job configuration
> >>> I'm not entirely sure about specifying which scheduler to be used via a
> >>> cluster level configuration option. Each job in a Flink cluster has its
> >>> own
> >>> JobMaster, and which scheduler to use is internal to that JobMaster. I
> >>> understand that the declarative scheduler requires the cluster to use
> >>> declarative resource management. However, other jobs should still be
> >>> allowed to use other scheduler implementations that also support
> >>> declarative resource management (e.g. the current `DefaultScheduler`).
> >>> Maybe we should consider the cluster level configuration option as a
> >>> default scheduler, and allow the job to specify a different scheduler
> in
> >>> its execution config. This is similar to how we specify which state
> >>> backend
> >>> to be used.
> >>>
> >>> ## Minor: It might be better to also show in the state machine figure
> >>> that
> >>> it can go from `Executing` to `Restarting` when new resources appear.
> >>>
> >>> Thank you~
> >>>
> >>> Xintong Song
> >>>
> >>>
> >>>
> >>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com>
> wrote:
> >>>
> >>> > Till, thanks a lot for the proposal.
> >>> >
> >>> > Even if the initial phase is only to support scale-up, maybe the
> >>> > "ScaleUpController" interface should be called "RescaleController" so
> >>> that
> >>> > in the future scale-down can be added.
> >>> >
> >>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
> >>> > wrote:
> >>> >
> >>> > > Hi everyone,
> >>> > >
> >>> > > I would like to start a discussion about adding a new type of
> >>> scheduler
> >>> > to
> >>> > > Flink. The declarative scheduler will first declare the required
> >>> > resources
> >>> > > and wait for them before deciding on the actual parallelism of a
> job.
> >>> > > Thereby it can better handle situations where resources cannot be
> >>> fully
> >>> > > fulfilled. Moreover, it will act as a building block for the
> reactive
> >>> > mode
> >>> > > where Flink should scale to the maximum of the currently available
> >>> > > resources.
> >>> > >
> >>> > > Please find more details in the FLIP wiki document [1]. Looking
> >>> forward
> >>> > to
> >>> > > your feedback.
> >>> > >
> >>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
> >>> > >
> >>> > > Cheers,
> >>> > > Till
> >>> > >
> >>> >
> >>>
> >>
>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Zhu Zhu <re...@gmail.com>.
Thanks Till for the explanation and the follow up actions.
That sounds good to me.

Thanks,
Zhu

Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午7:28写道:

> Thanks a lot for all your feedback. Let me try to answer it.
>
> # ScaleUpController vs. RescaleController
>
> At the moment, the idea of the declarative scheduler is to run a job with
> a parallelism which is as close to the target value as possible but to
> never exceed it. Since the target value is currently fixed (this will
> change once auto-scaling is supported), we never have to scale down. In
> fact, scale down events only happen if the system loses slots and then you
> cannot choose whether to scale down or not. This and the idea to keep the
> initial design as simple as possible motivated the naming of
> ScaleUpController. Once we support auto-scaling, we might have to rename
> this interface. However, since I don't fully know how things will look like
> with auto-scaling, I would like to refrain from this change now.
>
> # Relationship between declarative scheduler and existing implementations
>
> The declarative scheduler will implement the SchedulerNG interface. At the
> moment I am not aware of any required changes to this interface. Also the
> way the JobMaster interacts with the scheduler won't change to the best of
> my knowledge.
>
> # Awareness of SlotAllocator of the ExecutionGraph
>
> The idea is to make the SlotAllocator not aware of the ExecutionGraph
> because it can only be created after the SlotAllocator has decided on the
> parallelism the job can be run with. Moreover, the idea is that the
> JobInformation instance contains all the required information of a job to
> make this decision.
>
> The SlotAllocator also reserves the slots for a job before the
> ExecutionGraph is created. Consequently, we need a way to associate the
> slots with the Executions of the ExecutionGraph. Here we have decided to
> use the ExecutionVertexID as the identificator. We could also introduce a
> new identificator as long as we can map this one to the ExecutionVertexID.
> We could for example use JobVertexID and subtask id as the identificator
> but this is exactly what the ExecutionVertexID is. That's why we decided to
> reuse it for the time being.
>
> # ScaleUpController
>
> ## Cumulative parallelism
>
> Yes, the cumulative parallelism is the sum of all tasks.
>
> ## When to call the ScaleUpController
>
> Yes, the idea is that the scheduler will check whether one can run the job
> with an increased parallelism if there are new slots available. If this is
> the case, then we will ask the ScaleUpController, whether we actually
> should scale up (e.g. whether the increase is significant enough or whether
> enough time has passed between the last scale up operation).
>
> ## Do we provide enough information to the ScaleUpController
>
> I am pretty sure that we don't provide the ScaleUpController enough
> information to make the best possible decision. We wanted to keep it simple
> in the first version and iterate on the interface with the help of user
> feedback. I think that this interface won't fundamentally change the
> scheduler and, hence, it shouldn't block future extensions by starting with
> a simple interface.
>
> # Cluster vs. job configuration
>
> I think you are right that it would be most flexible if one could select a
> scheduler on a per job basis with falling back to the cluster configuration
> if nothing has been specified. For the sake of simplicity and narrowing
> down the scope I would consider this as a follow up.
>
> # Performance on failover
>
> I agree that the performance won't be optimal in the failover case because
> of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
> agree that FLINK-21110 will help a lot. For 2) moving the IO operations out
> of the EG could be a good improvement. With the same argument as above, I
> would consider this a follow up because I would like to keep the initial
> design as simple as possible and I think that these performance
> optimizations are not required to make this feature work.
>
> # Lost execution history
>
> You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
> of information. We are currently investigating which information is
> strictly required and needs to be maintained across ExecutionGraph
> creations (mainly for tests to succeed). One idea is to either store this
> information outside of the ExecutionGraph or to initialize the
> ExecutionGraph with the information from the previous instance. Most
> likely, we won't give a guarantee that all counters, timestamps and metrics
> are correctly maintained across failovers in the first version, though. It
> is a bit the same argument as above that this is not strictly required to
> make this feature work. Maybe this means that this feature is not
> production ready for some users, but I think this is ok.
>
> In general, to fully integrate the declarative scheduler with the web ui
> we have to be able to display changing ExecutionGraphs. Ideally, we would
> have something like a timeline where one can select the time for which to
> display the state of the job execution. If one goes all the way to the
> right, the system shows the live state and all other positions are the
> present time minus some offset.
>
> I will add the follow ups to the FLIP as potential improvements in order
> to keep track of them.
>
> Cheers,
> Till
>
> On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:
>
>> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
>>
>> The design looks generally good to me. I have 2 concerns though:
>>
>> # performance on failover
>> I can see is that an ExecutionGraph will be built and initialized on each
>> task failure. The process is possibly to be slow:
>> 1. the topology building can be very slow (much slower than the
>> scheduling
>> or restarting process) when the job scale becomes large (see
>> FLINK-20612).
>> Luckily FLINK-21110 will improve it.
>> 2. the input/output format can be slow due to possible IO work to
>> external services.
>> Maybe we should extract it out from ExecutionGraph building?
>>
>> # execution history lost
>> After building and using a new ExecutionGraph, the execution history in
>> the
>> old graph will be lost, including status timestamps of the job, prior
>> execution
>> attempts and their failures. Should we let the new EG inherit these
>> information
>> from prior EG? Maybe as a future plan with further discussions regarding
>> the
>> varying parallelism.
>>
>> Thanks,
>> Zhu
>>
>> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
>>
>>> Thanks for preparing the FLIP and starting the discussion, Till.
>>>
>>> I have a few questions trying to understand the design.
>>>
>>> ## What is the relationship between the current and new schedulers?
>>> IIUC, the new declarative scheduler will coexist with the current
>>> scheduler, as an alternative that the user needs to explicitly switch to.
>>> Then does it require any changes to the scheduler interfaces and how the
>>> JobMaster interacts with it?
>>>
>>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
>>> Looking at the interfaces, it seems to me that `SlotAllocator` only takes
>>> `JobInformation` and `VertexInformation` as topology inputs. However, it
>>> generates `ParallelismAndResourceAssignments` which maps slots to
>>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
>>> generated outside `SlotAllocator` while `ExecutionVertexID`s are
>>> generated
>>> inside `SlotAllocator`.
>>>
>>> ## About `ScaleUpController#canScaleUp`
>>>
>>> ### What is cumulative parallelism?
>>> The interface shows that the cumulative parallelism is a single number
>>> per
>>> job graph. I assume this is the sum of parallelism of all vertices?
>>>
>>> ### Is this interface expected to be called before or after
>>> `SlotAllocator#determineParallelism`?
>>> IIUC, on new resources appear, the scheduler always calls
>>> `SlotAllocator#determineParallelism` to generate a new plan based on the
>>> available resources, and `ScaleUpController#canScaleUp` is then called to
>>> decide whether to apply the new plan, according to whether the
>>> increasement
>>> is significant, how long the job has been running since last restart,
>>> etc.
>>> Is that correct?
>>>
>>> ### The cumulative parallelism may not be enough for deciding whether the
>>> job should be scaled up.
>>> I'm assuming my understanding for the above questions are correct. Please
>>> correct me if not.
>>>
>>> I noticed Chesnay's comment on the wiki page about making the decision
>>> based on when the job entered the execution state last time.
>>>
>>> In addition to that, there could also be situations where jobs may not
>>> benefit much an increment in cumulative parallelism.
>>> E.g., for a topology A -> B, where A and B are in different slot sharing
>>> groups, and the current parallelism for both A and B are 2. When 1 new
>>> slot
>>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
>>> But
>>> this may not be a significant beneficial change for the job because the
>>> overall performance is still bottlenecked by the parallelism of B.
>>>
>>> ## Cluster vs. Job configuration
>>> I'm not entirely sure about specifying which scheduler to be used via a
>>> cluster level configuration option. Each job in a Flink cluster has its
>>> own
>>> JobMaster, and which scheduler to use is internal to that JobMaster. I
>>> understand that the declarative scheduler requires the cluster to use
>>> declarative resource management. However, other jobs should still be
>>> allowed to use other scheduler implementations that also support
>>> declarative resource management (e.g. the current `DefaultScheduler`).
>>> Maybe we should consider the cluster level configuration option as a
>>> default scheduler, and allow the job to specify a different scheduler in
>>> its execution config. This is similar to how we specify which state
>>> backend
>>> to be used.
>>>
>>> ## Minor: It might be better to also show in the state machine figure
>>> that
>>> it can go from `Executing` to `Restarting` when new resources appear.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com> wrote:
>>>
>>> > Till, thanks a lot for the proposal.
>>> >
>>> > Even if the initial phase is only to support scale-up, maybe the
>>> > "ScaleUpController" interface should be called "RescaleController" so
>>> that
>>> > in the future scale-down can be added.
>>> >
>>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
>>> > wrote:
>>> >
>>> > > Hi everyone,
>>> > >
>>> > > I would like to start a discussion about adding a new type of
>>> scheduler
>>> > to
>>> > > Flink. The declarative scheduler will first declare the required
>>> > resources
>>> > > and wait for them before deciding on the actual parallelism of a job.
>>> > > Thereby it can better handle situations where resources cannot be
>>> fully
>>> > > fulfilled. Moreover, it will act as a building block for the reactive
>>> > mode
>>> > > where Flink should scale to the maximum of the currently available
>>> > > resources.
>>> > >
>>> > > Please find more details in the FLIP wiki document [1]. Looking
>>> forward
>>> > to
>>> > > your feedback.
>>> > >
>>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > >
>>> >
>>>
>>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Till Rohrmann <tr...@apache.org>.
Thanks a lot for all your feedback. Let me try to answer it.

# ScaleUpController vs. RescaleController

At the moment, the idea of the declarative scheduler is to run a job with a
parallelism which is as close to the target value as possible but to never
exceed it. Since the target value is currently fixed (this will change once
auto-scaling is supported), we never have to scale down. In fact, scale
down events only happen if the system loses slots and then you cannot
choose whether to scale down or not. This and the idea to keep the initial
design as simple as possible motivated the naming of ScaleUpController.
Once we support auto-scaling, we might have to rename this interface.
However, since I don't fully know how things will look like with
auto-scaling, I would like to refrain from this change now.

# Relationship between declarative scheduler and existing implementations

The declarative scheduler will implement the SchedulerNG interface. At the
moment I am not aware of any required changes to this interface. Also the
way the JobMaster interacts with the scheduler won't change to the best of
my knowledge.

# Awareness of SlotAllocator of the ExecutionGraph

The idea is to make the SlotAllocator not aware of the ExecutionGraph
because it can only be created after the SlotAllocator has decided on the
parallelism the job can be run with. Moreover, the idea is that the
JobInformation instance contains all the required information of a job to
make this decision.

The SlotAllocator also reserves the slots for a job before the
ExecutionGraph is created. Consequently, we need a way to associate the
slots with the Executions of the ExecutionGraph. Here we have decided to
use the ExecutionVertexID as the identificator. We could also introduce a
new identificator as long as we can map this one to the ExecutionVertexID.
We could for example use JobVertexID and subtask id as the identificator
but this is exactly what the ExecutionVertexID is. That's why we decided to
reuse it for the time being.

# ScaleUpController

## Cumulative parallelism

Yes, the cumulative parallelism is the sum of all tasks.

## When to call the ScaleUpController

Yes, the idea is that the scheduler will check whether one can run the job
with an increased parallelism if there are new slots available. If this is
the case, then we will ask the ScaleUpController, whether we actually
should scale up (e.g. whether the increase is significant enough or whether
enough time has passed between the last scale up operation).

## Do we provide enough information to the ScaleUpController

I am pretty sure that we don't provide the ScaleUpController enough
information to make the best possible decision. We wanted to keep it simple
in the first version and iterate on the interface with the help of user
feedback. I think that this interface won't fundamentally change the
scheduler and, hence, it shouldn't block future extensions by starting with
a simple interface.

# Cluster vs. job configuration

I think you are right that it would be most flexible if one could select a
scheduler on a per job basis with falling back to the cluster configuration
if nothing has been specified. For the sake of simplicity and narrowing
down the scope I would consider this as a follow up.

# Performance on failover

I agree that the performance won't be optimal in the failover case because
of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
agree that FLINK-21110 will help a lot. For 2) moving the IO operations out
of the EG could be a good improvement. With the same argument as above, I
would consider this a follow up because I would like to keep the initial
design as simple as possible and I think that these performance
optimizations are not required to make this feature work.

# Lost execution history

You are right Zhu Zhu that recreating the ExecutionGraph causes the loss of
information. We are currently investigating which information is strictly
required and needs to be maintained across ExecutionGraph creations (mainly
for tests to succeed). One idea is to either store this information outside
of the ExecutionGraph or to initialize the ExecutionGraph with the
information from the previous instance. Most likely, we won't give a
guarantee that all counters, timestamps and metrics are correctly
maintained across failovers in the first version, though. It is a bit the
same argument as above that this is not strictly required to make this
feature work. Maybe this means that this feature is not production ready
for some users, but I think this is ok.

In general, to fully integrate the declarative scheduler with the web ui we
have to be able to display changing ExecutionGraphs. Ideally, we would have
something like a timeline where one can select the time for which to
display the state of the job execution. If one goes all the way to the
right, the system shows the live state and all other positions are the
present time minus some offset.

I will add the follow ups to the FLIP as potential improvements in order to
keep track of them.

Cheers,
Till

On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <re...@gmail.com> wrote:

> Thanks for the proposal! @Till Rohrmann <tr...@apache.org>
>
> The design looks generally good to me. I have 2 concerns though:
>
> # performance on failover
> I can see is that an ExecutionGraph will be built and initialized on each
> task failure. The process is possibly to be slow:
> 1. the topology building can be very slow (much slower than the scheduling
> or restarting process) when the job scale becomes large (see FLINK-20612).
> Luckily FLINK-21110 will improve it.
> 2. the input/output format can be slow due to possible IO work to external
> services.
> Maybe we should extract it out from ExecutionGraph building?
>
> # execution history lost
> After building and using a new ExecutionGraph, the execution history in the
> old graph will be lost, including status timestamps of the job, prior
> execution
> attempts and their failures. Should we let the new EG inherit these
> information
> from prior EG? Maybe as a future plan with further discussions regarding
> the
> varying parallelism.
>
> Thanks,
> Zhu
>
> Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:
>
>> Thanks for preparing the FLIP and starting the discussion, Till.
>>
>> I have a few questions trying to understand the design.
>>
>> ## What is the relationship between the current and new schedulers?
>> IIUC, the new declarative scheduler will coexist with the current
>> scheduler, as an alternative that the user needs to explicitly switch to.
>> Then does it require any changes to the scheduler interfaces and how the
>> JobMaster interacts with it?
>>
>> ## Is `SlotAllocator` aware of `ExecutionGraph`?
>> Looking at the interfaces, it seems to me that `SlotAllocator` only takes
>> `JobInformation` and `VertexInformation` as topology inputs. However, it
>> generates `ParallelismAndResourceAssignments` which maps slots to
>> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
>> generated outside `SlotAllocator` while `ExecutionVertexID`s are generated
>> inside `SlotAllocator`.
>>
>> ## About `ScaleUpController#canScaleUp`
>>
>> ### What is cumulative parallelism?
>> The interface shows that the cumulative parallelism is a single number per
>> job graph. I assume this is the sum of parallelism of all vertices?
>>
>> ### Is this interface expected to be called before or after
>> `SlotAllocator#determineParallelism`?
>> IIUC, on new resources appear, the scheduler always calls
>> `SlotAllocator#determineParallelism` to generate a new plan based on the
>> available resources, and `ScaleUpController#canScaleUp` is then called to
>> decide whether to apply the new plan, according to whether the
>> increasement
>> is significant, how long the job has been running since last restart, etc.
>> Is that correct?
>>
>> ### The cumulative parallelism may not be enough for deciding whether the
>> job should be scaled up.
>> I'm assuming my understanding for the above questions are correct. Please
>> correct me if not.
>>
>> I noticed Chesnay's comment on the wiki page about making the decision
>> based on when the job entered the execution state last time.
>>
>> In addition to that, there could also be situations where jobs may not
>> benefit much an increment in cumulative parallelism.
>> E.g., for a topology A -> B, where A and B are in different slot sharing
>> groups, and the current parallelism for both A and B are 2. When 1 new
>> slot
>> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But
>> this may not be a significant beneficial change for the job because the
>> overall performance is still bottlenecked by the parallelism of B.
>>
>> ## Cluster vs. Job configuration
>> I'm not entirely sure about specifying which scheduler to be used via a
>> cluster level configuration option. Each job in a Flink cluster has its
>> own
>> JobMaster, and which scheduler to use is internal to that JobMaster. I
>> understand that the declarative scheduler requires the cluster to use
>> declarative resource management. However, other jobs should still be
>> allowed to use other scheduler implementations that also support
>> declarative resource management (e.g. the current `DefaultScheduler`).
>> Maybe we should consider the cluster level configuration option as a
>> default scheduler, and allow the job to specify a different scheduler in
>> its execution config. This is similar to how we specify which state
>> backend
>> to be used.
>>
>> ## Minor: It might be better to also show in the state machine figure that
>> it can go from `Executing` to `Restarting` when new resources appear.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com> wrote:
>>
>> > Till, thanks a lot for the proposal.
>> >
>> > Even if the initial phase is only to support scale-up, maybe the
>> > "ScaleUpController" interface should be called "RescaleController" so
>> that
>> > in the future scale-down can be added.
>> >
>> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I would like to start a discussion about adding a new type of
>> scheduler
>> > to
>> > > Flink. The declarative scheduler will first declare the required
>> > resources
>> > > and wait for them before deciding on the actual parallelism of a job.
>> > > Thereby it can better handle situations where resources cannot be
>> fully
>> > > fulfilled. Moreover, it will act as a building block for the reactive
>> > mode
>> > > where Flink should scale to the maximum of the currently available
>> > > resources.
>> > >
>> > > Please find more details in the FLIP wiki document [1]. Looking
>> forward
>> > to
>> > > your feedback.
>> > >
>> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> >
>>
>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Zhu Zhu <re...@gmail.com>.
Thanks for the proposal! @Till Rohrmann <tr...@apache.org>

The design looks generally good to me. I have 2 concerns though:

# performance on failover
I can see is that an ExecutionGraph will be built and initialized on each
task failure. The process is possibly to be slow:
1. the topology building can be very slow (much slower than the scheduling
or restarting process) when the job scale becomes large (see FLINK-20612).
Luckily FLINK-21110 will improve it.
2. the input/output format can be slow due to possible IO work to external
services.
Maybe we should extract it out from ExecutionGraph building?

# execution history lost
After building and using a new ExecutionGraph, the execution history in the
old graph will be lost, including status timestamps of the job, prior
execution
attempts and their failures. Should we let the new EG inherit these
information
from prior EG? Maybe as a future plan with further discussions regarding
the
varying parallelism.

Thanks,
Zhu

Xintong Song <to...@gmail.com> 于2021年1月24日周日 上午11:55写道:

> Thanks for preparing the FLIP and starting the discussion, Till.
>
> I have a few questions trying to understand the design.
>
> ## What is the relationship between the current and new schedulers?
> IIUC, the new declarative scheduler will coexist with the current
> scheduler, as an alternative that the user needs to explicitly switch to.
> Then does it require any changes to the scheduler interfaces and how the
> JobMaster interacts with it?
>
> ## Is `SlotAllocator` aware of `ExecutionGraph`?
> Looking at the interfaces, it seems to me that `SlotAllocator` only takes
> `JobInformation` and `VertexInformation` as topology inputs. However, it
> generates `ParallelismAndResourceAssignments` which maps slots to
> `ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
> generated outside `SlotAllocator` while `ExecutionVertexID`s are generated
> inside `SlotAllocator`.
>
> ## About `ScaleUpController#canScaleUp`
>
> ### What is cumulative parallelism?
> The interface shows that the cumulative parallelism is a single number per
> job graph. I assume this is the sum of parallelism of all vertices?
>
> ### Is this interface expected to be called before or after
> `SlotAllocator#determineParallelism`?
> IIUC, on new resources appear, the scheduler always calls
> `SlotAllocator#determineParallelism` to generate a new plan based on the
> available resources, and `ScaleUpController#canScaleUp` is then called to
> decide whether to apply the new plan, according to whether the increasement
> is significant, how long the job has been running since last restart, etc.
> Is that correct?
>
> ### The cumulative parallelism may not be enough for deciding whether the
> job should be scaled up.
> I'm assuming my understanding for the above questions are correct. Please
> correct me if not.
>
> I noticed Chesnay's comment on the wiki page about making the decision
> based on when the job entered the execution state last time.
>
> In addition to that, there could also be situations where jobs may not
> benefit much an increment in cumulative parallelism.
> E.g., for a topology A -> B, where A and B are in different slot sharing
> groups, and the current parallelism for both A and B are 2. When 1 new slot
> appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But
> this may not be a significant beneficial change for the job because the
> overall performance is still bottlenecked by the parallelism of B.
>
> ## Cluster vs. Job configuration
> I'm not entirely sure about specifying which scheduler to be used via a
> cluster level configuration option. Each job in a Flink cluster has its own
> JobMaster, and which scheduler to use is internal to that JobMaster. I
> understand that the declarative scheduler requires the cluster to use
> declarative resource management. However, other jobs should still be
> allowed to use other scheduler implementations that also support
> declarative resource management (e.g. the current `DefaultScheduler`).
> Maybe we should consider the cluster level configuration option as a
> default scheduler, and allow the job to specify a different scheduler in
> its execution config. This is similar to how we specify which state backend
> to be used.
>
> ## Minor: It might be better to also show in the state machine figure that
> it can go from `Executing` to `Restarting` when new resources appear.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com> wrote:
>
> > Till, thanks a lot for the proposal.
> >
> > Even if the initial phase is only to support scale-up, maybe the
> > "ScaleUpController" interface should be called "RescaleController" so
> that
> > in the future scale-down can be added.
> >
> > On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about adding a new type of scheduler
> > to
> > > Flink. The declarative scheduler will first declare the required
> > resources
> > > and wait for them before deciding on the actual parallelism of a job.
> > > Thereby it can better handle situations where resources cannot be fully
> > > fulfilled. Moreover, it will act as a building block for the reactive
> > mode
> > > where Flink should scale to the maximum of the currently available
> > > resources.
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking forward
> > to
> > > your feedback.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/mwtRCg
> > >
> > > Cheers,
> > > Till
> > >
> >
>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Xintong Song <to...@gmail.com>.
Thanks for preparing the FLIP and starting the discussion, Till.

I have a few questions trying to understand the design.

## What is the relationship between the current and new schedulers?
IIUC, the new declarative scheduler will coexist with the current
scheduler, as an alternative that the user needs to explicitly switch to.
Then does it require any changes to the scheduler interfaces and how the
JobMaster interacts with it?

## Is `SlotAllocator` aware of `ExecutionGraph`?
Looking at the interfaces, it seems to me that `SlotAllocator` only takes
`JobInformation` and `VertexInformation` as topology inputs. However, it
generates `ParallelismAndResourceAssignments` which maps slots to
`ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
generated outside `SlotAllocator` while `ExecutionVertexID`s are generated
inside `SlotAllocator`.

## About `ScaleUpController#canScaleUp`

### What is cumulative parallelism?
The interface shows that the cumulative parallelism is a single number per
job graph. I assume this is the sum of parallelism of all vertices?

### Is this interface expected to be called before or after
`SlotAllocator#determineParallelism`?
IIUC, on new resources appear, the scheduler always calls
`SlotAllocator#determineParallelism` to generate a new plan based on the
available resources, and `ScaleUpController#canScaleUp` is then called to
decide whether to apply the new plan, according to whether the increasement
is significant, how long the job has been running since last restart, etc.
Is that correct?

### The cumulative parallelism may not be enough for deciding whether the
job should be scaled up.
I'm assuming my understanding for the above questions are correct. Please
correct me if not.

I noticed Chesnay's comment on the wiki page about making the decision
based on when the job entered the execution state last time.

In addition to that, there could also be situations where jobs may not
benefit much an increment in cumulative parallelism.
E.g., for a topology A -> B, where A and B are in different slot sharing
groups, and the current parallelism for both A and B are 2. When 1 new slot
appears, `SlotAllocator` may suggest increasing parallelism of A to 3. But
this may not be a significant beneficial change for the job because the
overall performance is still bottlenecked by the parallelism of B.

## Cluster vs. Job configuration
I'm not entirely sure about specifying which scheduler to be used via a
cluster level configuration option. Each job in a Flink cluster has its own
JobMaster, and which scheduler to use is internal to that JobMaster. I
understand that the declarative scheduler requires the cluster to use
declarative resource management. However, other jobs should still be
allowed to use other scheduler implementations that also support
declarative resource management (e.g. the current `DefaultScheduler`).
Maybe we should consider the cluster level configuration option as a
default scheduler, and allow the job to specify a different scheduler in
its execution config. This is similar to how we specify which state backend
to be used.

## Minor: It might be better to also show in the state machine figure that
it can go from `Executing` to `Restarting` when new resources appear.

Thank you~

Xintong Song



On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <st...@gmail.com> wrote:

> Till, thanks a lot for the proposal.
>
> Even if the initial phase is only to support scale-up, maybe the
> "ScaleUpController" interface should be called "RescaleController" so that
> in the future scale-down can be added.
>
> On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > I would like to start a discussion about adding a new type of scheduler
> to
> > Flink. The declarative scheduler will first declare the required
> resources
> > and wait for them before deciding on the actual parallelism of a job.
> > Thereby it can better handle situations where resources cannot be fully
> > fulfilled. Moreover, it will act as a building block for the reactive
> mode
> > where Flink should scale to the maximum of the currently available
> > resources.
> >
> > Please find more details in the FLIP wiki document [1]. Looking forward
> to
> > your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/mwtRCg
> >
> > Cheers,
> > Till
> >
>

Re: [DISCUSS] FLIP-160: Declarative scheduler

Posted by Steven Wu <st...@gmail.com>.
Till, thanks a lot for the proposal.

Even if the initial phase is only to support scale-up, maybe the
"ScaleUpController" interface should be called "RescaleController" so that
in the future scale-down can be added.

On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi everyone,
>
> I would like to start a discussion about adding a new type of scheduler to
> Flink. The declarative scheduler will first declare the required resources
> and wait for them before deciding on the actual parallelism of a job.
> Thereby it can better handle situations where resources cannot be fully
> fulfilled. Moreover, it will act as a building block for the reactive mode
> where Flink should scale to the maximum of the currently available
> resources.
>
> Please find more details in the FLIP wiki document [1]. Looking forward to
> your feedback.
>
> [1] https://cwiki.apache.org/confluence/x/mwtRCg
>
> Cheers,
> Till
>