You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Etienne Chauchot <ec...@apache.org> on 2023/06/12 15:34:25 UTC

[DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Hi,

I’d like to start a discussion about FLIP-322 [1] which introduces a 
cooldown period for the adaptive scheduler.

I'd like to get your feedback especially @Robert as you opened the 
related ticket and worked on the reactive mode a lot.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler

Best

Etienne


Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Chesnay Schepler <ch...@apache.org>.
I think the cooldown still makes sense with FLIP-291 APIs.

If you want to fully control the parallelism and rescale timings then 
you can set the cooldown to zero.
If you don't want complete control but just the target parallelism from 
time to time, then the cooldown within Flink still makes sense imo 
because it can account for all scale up operations, which an external 
scaler would struggle with (because it doesn't actually know when a 
scale up happened).

 > Wouldn't a simple case where you add a new TM and remove it before 
the max interval is reached (so there is nothing to do) result in an 
unnecessary job restart?

Depends on how you implement it. If you ignore all of shouldRescale, 
yes, but you shouldn't do that in the first place.

Within shouldRescale() the SlotAllocater wouldn't provide us with a new 
parallelism alternative and we wouldn't ask the RescaleController, which 
is the bit we actually want to override.

On 04/07/2023 09:16, David Morávek wrote:
> > They will struggle if they add new resources and nothing happens for 
> 5 minutes.
>
> The same applies if they start playing with FLIP-291 APIs. I'm 
> wondering if the cooldown makes sense there since it was the user's 
> deliberate choice to push new requirements. 🤔
>
> Best,
> D.
>
> On Tue, Jul 4, 2023 at 9:11 AM David Morávek <dm...@apache.org> wrote:
>
>     The FLIP reads sane to me. I'm unsure about the default values,
>     though; 5 minutes of wait time between rescales feels rather
>     strict, and we should rethink it to provide a better
>     out-of-the-box experience.
>
>     I'd focus on newcomers trying AS / Reactive Mode out. They will
>     struggle if they add new resources and nothing happens for 5
>     minutes. I'd suggest defaulting to
>     /jobmanager.adaptive-scheduler.resource-stabilization-timeout/ (which
>     defaults to 10s).
>
>     I'm still struggling to grasp max internal (force rescale).
>     Ignoring `AdaptiveScheduler#shouldRescale()` condition seems
>     rather dangerous. Wouldn't a simple case where you add a new TM
>     and remove it before the max interval is reached (so there is
>     nothing to do) result in an unnecessary job restart?
>
>     Best,
>     D.
>
>     On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot
>     <ec...@apache.org> wrote:
>
>         Thanks Chesnay for your feedback. I have updated the FLIP.
>         I'll start a
>         vote thread.
>
>         Best
>
>         Etienne
>
>         Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>         > > we should schedule a check that will rescale if
>         > min-parallelism-increase is met. Then, what it the use of
>         > scaling-interval.max timeout in that context ?
>         >
>         > To force a rescale if min-parallelism-increase is not met
>         (but we
>         > could still run above the current parallelism).
>         >
>         > min-parallelism-increase is a trade-off between the cost of
>         rescaling
>         > vs the performance benefit of the parallelism increase. Over
>         time the
>         > balance tips more and more in favor of the parallelism
>         increase, hence
>         > we should eventually rescale anyway even if the minimum
>         isn't met, or
>         > at least give users the option to do so.
>         >
>         > > I meant the opposite: not having only the cooldown but
>         having only
>         > the stabilization time. I must have missed something because
>         what I
>         > wonder is: if every rescale entails a restart of the
>         pipeline and
>         > every restart entails passing in waiting for resources
>         state, then why
>         > introduce a cooldown when there is already at each rescale a
>         stable
>         > resource timeout ?
>         >
>         > It is technically correct that the stable resource timeout
>         can be used
>         > to limit the number of rescale operations per interval,
>         however during
>         > that time the job isn't running, in contrast to the cooldown.
>         >
>         > Having both just gives you a lot more flexibility.
>         > "I want at most 1 rescale operation per hour, and wait at
>         most 1
>         > minute for resource to stabilize when a rescale happens".
>         > You can't express this with only one of the options.
>         >
>         > On 20/06/2023 14:41, Etienne Chauchot wrote:
>         >> Hi Chesnay,
>         >>
>         >> Thanks for your feedback. Comments inline
>         >>
>         >> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>         >>> 1) Options specific to the adaptive scheduler should start
>         with
>         >>> "jobmanager.adaptive-scheduler".
>         >>
>         >>
>         >> ok
>         >>
>         >>
>         >>> 2)
>         >>> There isn't /really /a notion of a "scaling event". The
>         scheduler is
>         >>> informed about new/lost slots and job failures, and reacts
>         >>> accordingly by maybe rescaling the job.
>         >>> (sure, you can think of these as events, but you can think of
>         >>> practically everything as events)
>         >>>
>         >>> There shouldn't be a queue for events. All the scheduler
>         should have
>         >>> to know is that the next rescale check is scheduled for
>         time T,
>         >>> which in practice boils down to a flag and a scheduled
>         action that
>         >>> runs Executing#maybeRescale.
>         >>
>         >>
>         >> Makes total sense, its very simple like this. Thanks for the
>         >> precision and pointer. After the related FLIPs, I'll look
>         at the code
>         >> now.
>         >>
>         >>
>         >>> With that in mind, we also have to look at how we keep
>         this state
>         >>> around. Presumably it is scoped to the current state, such
>         that the
>         >>> cooldown is reset if a job fails.
>         >>> Maybe we should add a separate ExecutingWithCooldown
>         state; not sure
>         >>> yet.
>         >>
>         >>
>         >> Yes loosing cooldown state and cooldown reset upon failure
>         is what I
>         >> suggested in point 3 in previous email. Not sure either for
>         a new
>         >> state, I'll figure it out after experimenting with the
>         code. I'll
>         >> update the FLIP then.
>         >>
>         >>
>         >>>
>         >>> It would be good to clarify whether this FLIP only
>         attempts to cover
>         >>> scale up operations, or also scale downs in case of slot
>         losses.
>         >>
>         >>
>         >> When there are slots loss, most of the time it is due to a
>         TM loss so
>         >> there should be several slots lost at the same time but
>         (hopefully)
>         >> only once. There should not be many scale downs in a row
>         (but still
>         >> cascading failures can happen). I think, we should just
>         protect
>         >> against having scale ups immediately following. For that, I
>         think we
>         >> could just keep the current behavior of transitioning to
>         Restarting
>         >> state and then back to Waiting for Resources state. This
>         state will
>         >> protect us against scale ups immediately following
>         failure/restart.
>         >>
>         >>
>         >>>
>         >>> We should also think about how it relates to the externalized
>         >>> declarative resource management. Should we always rescale
>         >>> immediately? Should we wait until the cooldown is over?
>         >>
>         >>
>         >> It relates to point 2, no ? we should rescale immediately
>         only if
>         >> last rescale was done more than scaling-interval.min ago
>         otherwise
>         >> schedule a rescale at last-rescale + scaling-interval.min time.
>         >>
>         >>
>         >>> Related to this, there's the min-parallelism-increase
>         option, that
>         >>> if for example set to "2" restricts rescale operations to
>         only occur
>         >>> if the parallelism increases by at least 2.
>         >>
>         >>
>         >> yes I saw that in the code
>         >>
>         >>
>         >>> Ideally however there would be a max timeout for this.
>         >>>
>         >>> As such we could maybe think about this a bit differently:
>         >>> Add 2 new options instead of 1:
>         >>> jobmanager.adaptive-scheduler.scaling-interval.min: The
>         minimum time
>         >>> the scheduler will wait for the next effective rescale
>         operations.
>         >>> jobmanager.adaptive-scheduler.scaling-interval.max: The
>         maximum time
>         >>> the scheduler will wait for the next effective rescale
>         operations.
>         >>
>         >>
>         >> At point 2, we said that when slots change (requirements
>         change or
>         >> new slots available), if last rescale check (call to
>         maybeRescale)
>         >> was done less than scaling-interval.min ago, we should
>         schedule a
>         >> check that will rescale if min-parallelism-increase is met.
>         Then,
>         >> what it the use of scaling-interval.max timeout in that
>         context ?
>         >>
>         >>
>         >>>
>         >>> 3) It sounds fine that we lose the cooldown state, because
>         imo we
>         >>> want to reset the cooldown anyway on job failures (because
>         a job
>         >>> failure inherently implies a potential rescaling).
>         >>
>         >>
>         >> exactly.
>         >>
>         >>
>         >>>
>         >>> 4) The stabilization time isn't really redundant and serves a
>         >>> different use-case. The idea behind is that if a users
>         adds multiple
>         >>> TMs at once then we don't want to rescale immediately at
>         the first
>         >>> received slot. Without the stabilization time the cooldown
>         would
>         >>> actually cause bad behavior here, because not only would
>         we rescale
>         >>> immediately upon receiving the minimum required slots to
>         scale up,
>         >>> but we also wouldn't use the remaining slots just because the
>         >>> cooldown says so.
>         >>
>         >>
>         >> I meant the opposite: not having only the cooldown but
>         having only
>         >> the stabilization time. I must have missed something
>         because what I
>         >> wonder is: if every rescale entails a restart of the
>         pipeline and
>         >> every restart entails passing in waiting for resources
>         state, then
>         >> why introduce a cooldown when there is already at each
>         rescale a
>         >> stable resource timeout ?
>         >>
>         >>
>         >> Best
>         >>
>         >> Etienne
>         >>
>         >>
>         >>
>         >>>
>         >>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>         >>>> Hi Robert,
>         >>>>
>         >>>> Thanks for your feedback. I don't know the scheduler part
>         well
>         >>>> enough yet and I'm taking this ticket as a learning workshop.
>         >>>>
>         >>>> Regarding your comments:
>         >>>>
>         >>>> 1. Taking a look at the AdaptiveScheduler class which
>         takes all its
>         >>>> configuration from the JobManagerOptions, and also to be
>         consistent
>         >>>> with other parameters name, I'd suggest
>         >>>> /jobmanager.scheduler-scaling-cooldown-period/
>         >>>>
>         >>>> 2. I thought scaling events existed already and the
>         scheduler
>         >>>> received them as mentioned in FLIP-160 (cf "Whenever the
>         scheduler
>         >>>> is in the Executing state and receives new slots") or in
>         FLIP-138
>         >>>> (cf "Whenever new slots are available the SlotPool
>         notifies the
>         >>>> Scheduler"). If it is not the case (it is the scheduler
>         who asks
>         >>>> for slots), then there is no need for storing scaling
>         requests indeed.
>         >>>>
>         >>>> => I need a confirmation here
>         >>>>
>         >>>> 3. If we loose the JobManager, we loose both the
>         AdaptiveScheduler
>         >>>> state and the CoolDownTimer state. So, upon recovery, it
>         would be
>         >>>> as if there was no ongoing coolDown period. So, a first
>         re-scale
>         >>>> could happen right away and it will start a coolDown
>         period. A
>         >>>> second re-scale would have to wait for the end of this
>         period.
>         >>>>
>         >>>> 4. When a pipeline is re-scaled, it is restarted. Upon
>         restart, the
>         >>>> AdaptiveScheduler passes again in the "waiting for
>         resources" state
>         >>>> as FLIP-160 suggests. If so, then it seems that the
>         coolDown period
>         >>>> is kind of redundant with the
>         resource-stabilization-timeout. I
>         >>>> guess it is not the case otherwise the FLINK-21883 ticket
>         would not
>         >>>> have been created.
>         >>>>
>         >>>> => I need a confirmation here also.
>         >>>>
>         >>>>
>         >>>> Thanks for your views on point 2 and 4.
>         >>>>
>         >>>>
>         >>>> Best
>         >>>>
>         >>>> Etienne
>         >>>>
>         >>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>         >>>>> Thanks for the FLIP.
>         >>>>>
>         >>>>> Some comments:
>         >>>>> 1. Can you specify the full proposed configuration name? "
>         >>>>> scaling-cooldown-period" is probably not the full config
>         name?
>         >>>>> 2. Why is the concept of scaling events and a scaling queue
>         >>>>> needed? If I
>         >>>>> remember correctly, the adaptive scheduler will just
>         check how many
>         >>>>> TaskManagers are available and then adjust the execution
>         graph
>         >>>>> accordingly.
>         >>>>> There's no need to store a number of scaling events. We
>         just need to
>         >>>>> determine the time to trigger an adjustment of the
>         execution graph.
>         >>>>> 3. What's the behavior wrt to JobManager failures (e.g.
>         we lose
>         >>>>> the state
>         >>>>> of the Adaptive Scheduler?). My proposal would be to
>         just reset the
>         >>>>> cooldown period, so after recovery of a JobManager, we
>         have to
>         >>>>> wait at
>         >>>>> least for the cooldown period until further scaling
>         operations are
>         >>>>> done.
>         >>>>> 4. What's the relationship to the
>         >>>>>
>         "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>         >>>>> configuration?
>         >>>>>
>         >>>>> Thanks a lot for working on this!
>         >>>>>
>         >>>>> Best,
>         >>>>> Robert
>         >>>>>
>         >>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>         >>>>> Chauchot<ec...@apache.org>
>         >>>>> wrote:
>         >>>>>
>         >>>>>> Hi all,
>         >>>>>>
>         >>>>>> @Yukia,I updated the FLIP to include the aggregation of
>         the staked
>         >>>>>> operations that we discussed below PTAL.
>         >>>>>>
>         >>>>>> Best
>         >>>>>>
>         >>>>>> Etienne
>         >>>>>>
>         >>>>>>
>         >>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>         >>>>>>> Hi Yuxia,
>         >>>>>>>
>         >>>>>>> Thanks for your feedback. The number of potentially
>         stacked
>         >>>>>>> operations
>         >>>>>>> depends on the configured length of the cooldown period.
>         >>>>>>>
>         >>>>>>>
>         >>>>>>>
>         >>>>>>> The proposition in the FLIP is to add a minimum delay
>         between 2
>         >>>>>>> scaling
>         >>>>>>> operations. But, indeed, an optimization could be to
>         still stack
>         >>>>>>> the
>         >>>>>>> operations (that arrive during a cooldown period) but
>         maybe not
>         >>>>>>> take
>         >>>>>>> only the last operation but rather aggregate them in
>         order to
>         >>>>>>> end up
>         >>>>>>> with a single aggregated operation when the cooldown
>         period
>         >>>>>>> ends. For
>         >>>>>>> example, let's say 3 taskManagers come up and 1 comes
>         down
>         >>>>>>> during the
>         >>>>>>> cooldown period, we could generate a single operation
>         of scale
>         >>>>>>> up +2
>         >>>>>>> when the period ends.
>         >>>>>>>
>         >>>>>>> As a side note regarding your comment on "it'll take a
>         long time to
>         >>>>>>> finish all", please keep in mind that the reactive
>         mode (at
>         >>>>>>> least for
>         >>>>>>> now) is only available for streaming pipeline which
>         are in essence
>         >>>>>>> infinite processing.
>         >>>>>>>
>         >>>>>>> Another side note: when you mention "every taskManagers
>         >>>>>>> connecting",
>         >>>>>>> if you are referring to the start of the pipeline,
>         please keep
>         >>>>>>> in mind
>         >>>>>>> that the adaptive scheduler has a "waiting for
>         resources" timeout
>         >>>>>>> period before starting the pipeline in which all
>         taskmanagers
>         >>>>>>> connect
>         >>>>>>> and the parallelism is decided.
>         >>>>>>>
>         >>>>>>> Best
>         >>>>>>>
>         >>>>>>> Etienne
>         >>>>>>>
>         >>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>         >>>>>>>> Hi, Etienne. Thanks for driving it. I have one
>         question about the
>         >>>>>>>> mechanism of the cooldown timeout.
>         >>>>>>>>
>         >>>>>>>>  From the Proposed Changes part, if a scalling event is
>         >>>>>>>> received and
>         >>>>>>>> it falls during the cooldown period, it'll be stacked
>         to be
>         >>>>>>>> executed
>         >>>>>>>> after the period ends. Also, from the description of
>         >>>>>>>> FLINK-21883[1],
>         >>>>>>>> cooldown timeout is to avoid rescaling the job very
>         frequently,
>         >>>>>>>> because TaskManagers are not all connecting at the
>         same time.
>         >>>>>>>>
>         >>>>>>>> So, is it possible that every taskmanager connecting
>         will
>         >>>>>>>> produce a
>         >>>>>>>> scalling event and it'll be stacked with many scale
>         up event which
>         >>>>>>>> causes it'll take a long time to finish all? Can we
>         just take the
>         >>>>>>>> last one event?
>         >>>>>>>>
>         >>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>         >>>>>>>>
>         >>>>>>>> Best regards, Yuxia
>         >>>>>>>>
>         >>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>         >>>>>>>> Chauchot"<ec...@apache.org>
>         >>>>>>>> 收件人:
>         >>>>>>>> "dev"<de...@flink.apache.org>, "Robert
>         Metzger"<me...@gmail.com>
>         >>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题:
>         [DISCUSS]
>         >>>>>>>> FLIP-322
>         >>>>>>>> Cooldown
>         >>>>>>>> period for adaptive scheduler
>         >>>>>>>>
>         >>>>>>>> Hi,
>         >>>>>>>>
>         >>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>         >>>>>>>> introduces a
>         >>>>>>>> cooldown period for the adaptive scheduler.
>         >>>>>>>>
>         >>>>>>>> I'd like to get your feedback especially @Robert as
>         you opened the
>         >>>>>>>> related ticket and worked on the reactive mode a lot.
>         >>>>>>>>
>         >>>>>>>> [1]
>         >>>>>>>>
>         >>>>>>
>         https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>
>         >>>>>>
>         >>>>>>> Best
>         >>>>>>>> Etienne
>         >>>
>         >>>
>         >
>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi,

I think we have reached a consensus here. I have updated the FLIP to 
reflect recent suggestions. I will start a new vote.

Best

Etienne

Le 05/07/2023 à 14:42, Etienne Chauchot a écrit :
>
> Hi all,
>
> Thanks David for your suggestions. Comments inline.
>
> Le 04/07/2023 à 13:35, David Morávek a écrit :
>>> waiting 2 min between 2 requirements push seems ok to me
>> This depends on the workload. Would you care if the cost of rescaling were
>> close to zero (which is for most out-of-the-box workloads)? In that case,
>> it would be desirable to rescale more frequently, for example, if TMs join
>> incrementally.
>>
>> Creating a value that covers everything is impossible unless it's
>> self-tuning, so I'd prefer having a smooth experience for people trying
>> things out (just imagine doing a demo at the conference) and having them
>> opt-in for longer cooldowns.
>>
> The users still have the ability to lower the cooldown period for high 
> workloads but we could definitely set a default value to a lower 
> number. I agree to favo 
> <https://www.linguee.fr/anglais-francais/traduction/favour.html>r 
> lower numbers (for smooth rescale experience) and consider higher 
> numbers (for high workloads) as exceptions. But we still need to agree 
> on a suitable default for most cases: 30s ?
>> One idea to keep the timeouts lower while getting more balance would be
>> restarting the cooldown period when new resources or requirements are
>> received. This would also bring the cooldown's behavior closer to the
>> resource-stabilization timeout. Would that make sense?
>
>
> you mean, if slots are received during the cooldown period instead of 
> proposed behavior (A),  do behavior (B) ?
>
> A. schedule a rescale at lastRescale + cooldown point in time
>
> B. schedule a rescale at ** now ** + cooldown point in time
>
> It looks fine to me. It is even better because it avoids having 2 
> rescales scheduled at the same time if 2 slots change arrive during 
> the same cooldown period.
>
>
> Etienne
>
>
>>> Depends on how you implement it. If you ignore all of shouldRescale, yes,
>> but you shouldn't do that in the first place.
>
>
> I agree, this is not what I planned to implement.
>
>
>> That sounds great; let's go ahead and outline this in the FLIP.
>>
>> Best,
>> D.
>>
>>
>> On Tue, Jul 4, 2023 at 12:30 PM Etienne Chauchot<ec...@apache.org>
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks David for your feedback. My comments are inline
>>>
>>> Le 04/07/2023 à 09:16, David Morávek a écrit :
>>>>> They will struggle if they add new resources and nothing happens for 5
>>>> minutes.
>>>>
>>>> The same applies if they start playing with FLIP-291 APIs. I'm wondering
>>> if
>>>> the cooldown makes sense there since it was the user's deliberate choice
>>> to
>>>> push new requirements. 🤔
>>> Sure, but remember that the initial rescale is always done immediately.
>>> Only the time between 2 rescales is controlled by the cooldown period. I
>>> don't see a user adding resources every 10s (your proposed default
>>> value) or even with, let's say 2 min, waiting 2 min between 2
>>> requirements push seems ok to me.
>>>
>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Tue, Jul 4, 2023 at 9:11 AM David Morávek<dm...@apache.org>   wrote:
>>>>
>>>>> The FLIP reads sane to me. I'm unsure about the default values, though;
>>> 5
>>>>> minutes of wait time between rescales feels rather strict, and we should
>>>>> rethink it to provide a better out-of-the-box experience.
>>>>>
>>>>> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
>>>>> if they add new resources and nothing happens for 5 minutes. I'd suggest
>>>>> defaulting to
>>>>> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
>>>>> defaults to 10s).
>>> If users add resources, the re-scale will happen right away. It is only
>>> for next additions that they will have to wait for the coolDown period
>>> to end.
>>>
>>> But anyway, we could lower the default value, I just took what Robert
>>> suggested in the ticket.
>>>
>>>
>>>>> I'm still struggling to grasp max internal (force rescale). Ignoring
>>> `AdaptiveScheduler#shouldRescale()`
>>>>> condition seems rather dangerous. Wouldn't a simple case where you add a
>>>>> new TM and remove it before the max interval is reached (so there is
>>>>> nothing to do) result in an unnecessary job restart?
>>> With current behavior (on master) : adding the TM will result in
>>> restarting if the number of slots added leads to job parallelism
>>> increase of more than 2. Then removing it can have 2 consequences:
>>> either it is removed before the resource-stabilisation timeout and there
>>> will be no restart. Or it is removed after this timeout (the job is in
>>> Running state) and it will entail another restart and parallelism decrease.
>>>
>>> With the proposed behavior: what the scaling-interval.max will change is
>>> only on the resource addition part: when the TM is added, if the time
>>> since last rescale > scaling-interval.max, then a restart and
>>> parallelism increase will be done even if it leads to a parallelism
>>> increase < 2. The rest regarding TM removal does not change.
>>>
>>> => So, the real difference with the current behavior is ** if the slots
>>> addition was too little ** : in the current behavior nothing happens. In
>>> the new behavior nothing happens unless the addition arrives after
>>> scaling-interval.max.
>>>
>>>
>>> Best
>>>
>>> Etienne
>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot<ec...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
>>>>>> vote thread.
>>>>>>
>>>>>> Best
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>>>>>>>> we should schedule a check that will rescale if
>>>>>>> min-parallelism-increase is met. Then, what it the use of
>>>>>>> scaling-interval.max timeout in that context ?
>>>>>>>
>>>>>>> To force a rescale if min-parallelism-increase is not met (but we
>>>>>>> could still run above the current parallelism).
>>>>>>>
>>>>>>> min-parallelism-increase is a trade-off between the cost of rescaling
>>>>>>> vs the performance benefit of the parallelism increase. Over time the
>>>>>>> balance tips more and more in favor of the parallelism increase, hence
>>>>>>> we should eventually rescale anyway even if the minimum isn't met, or
>>>>>>> at least give users the option to do so.
>>>>>>>
>>>>>>>> I meant the opposite: not having only the cooldown but having only
>>>>>>> the stabilization time. I must have missed something because what I
>>>>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>>>>> every restart entails passing in waiting for resources state, then why
>>>>>>> introduce a cooldown when there is already at each rescale a stable
>>>>>>> resource timeout ?
>>>>>>>
>>>>>>> It is technically correct that the stable resource timeout can be used
>>>>>>> to limit the number of rescale operations per interval, however during
>>>>>>> that time the job isn't running, in contrast to the cooldown.
>>>>>>>
>>>>>>> Having both just gives you a lot more flexibility.
>>>>>>> "I want at most 1 rescale operation per hour, and wait at most 1
>>>>>>> minute for resource to stabilize when a rescale happens".
>>>>>>> You can't express this with only one of the options.
>>>>>>>
>>>>>>> On 20/06/2023 14:41, Etienne Chauchot wrote:
>>>>>>>> Hi Chesnay,
>>>>>>>>
>>>>>>>> Thanks for your feedback. Comments inline
>>>>>>>>
>>>>>>>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>>>>>>>>> 1) Options specific to the adaptive scheduler should start with
>>>>>>>>> "jobmanager.adaptive-scheduler".
>>>>>>>> ok
>>>>>>>>
>>>>>>>>
>>>>>>>>> 2)
>>>>>>>>> There isn't /really /a notion of a "scaling event". The scheduler is
>>>>>>>>> informed about new/lost slots and job failures, and reacts
>>>>>>>>> accordingly by maybe rescaling the job.
>>>>>>>>> (sure, you can think of these as events, but you can think of
>>>>>>>>> practically everything as events)
>>>>>>>>>
>>>>>>>>> There shouldn't be a queue for events. All the scheduler should have
>>>>>>>>> to know is that the next rescale check is scheduled for time T,
>>>>>>>>> which in practice boils down to a flag and a scheduled action that
>>>>>>>>> runs Executing#maybeRescale.
>>>>>>>> Makes total sense, its very simple like this. Thanks for the
>>>>>>>> precision and pointer. After the related FLIPs, I'll look at the code
>>>>>>>> now.
>>>>>>>>
>>>>>>>>
>>>>>>>>> With that in mind, we also have to look at how we keep this state
>>>>>>>>> around. Presumably it is scoped to the current state, such that the
>>>>>>>>> cooldown is reset if a job fails.
>>>>>>>>> Maybe we should add a separate ExecutingWithCooldown state; not sure
>>>>>>>>> yet.
>>>>>>>> Yes loosing cooldown state and cooldown reset upon failure is what I
>>>>>>>> suggested in point 3 in previous email. Not sure either for a new
>>>>>>>> state, I'll figure it out after experimenting with the code. I'll
>>>>>>>> update the FLIP then.
>>>>>>>>
>>>>>>>>
>>>>>>>>> It would be good to clarify whether this FLIP only attempts to cover
>>>>>>>>> scale up operations, or also scale downs in case of slot losses.
>>>>>>>> When there are slots loss, most of the time it is due to a TM loss so
>>>>>>>> there should be several slots lost at the same time but (hopefully)
>>>>>>>> only once. There should not be many scale downs in a row (but still
>>>>>>>> cascading failures can happen). I think, we should just protect
>>>>>>>> against having scale ups immediately following. For that, I think we
>>>>>>>> could just keep the current behavior of transitioning to Restarting
>>>>>>>> state and then back to Waiting for Resources state. This state will
>>>>>>>> protect us against scale ups immediately following failure/restart.
>>>>>>>>
>>>>>>>>
>>>>>>>>> We should also think about how it relates to the externalized
>>>>>>>>> declarative resource management. Should we always rescale
>>>>>>>>> immediately? Should we wait until the cooldown is over?
>>>>>>>> It relates to point 2, no ? we should rescale immediately only if
>>>>>>>> last rescale was done more than scaling-interval.min ago otherwise
>>>>>>>> schedule a rescale at last-rescale + scaling-interval.min time.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Related to this, there's the min-parallelism-increase option, that
>>>>>>>>> if for example set to "2" restricts rescale operations to only occur
>>>>>>>>> if the parallelism increases by at least 2.
>>>>>>>> yes I saw that in the code
>>>>>>>>
>>>>>>>>
>>>>>>>>> Ideally however there would be a max timeout for this.
>>>>>>>>>
>>>>>>>>> As such we could maybe think about this a bit differently:
>>>>>>>>> Add 2 new options instead of 1:
>>>>>>>>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
>>>>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>>>>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
>>>>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>>>> At point 2, we said that when slots change (requirements change or
>>>>>>>> new slots available), if last rescale check (call to maybeRescale)
>>>>>>>> was done less than scaling-interval.min ago, we should schedule a
>>>>>>>> check that will rescale if min-parallelism-increase is met. Then,
>>>>>>>> what it the use of scaling-interval.max timeout in that context ?
>>>>>>>>
>>>>>>>>
>>>>>>>>> 3) It sounds fine that we lose the cooldown state, because imo we
>>>>>>>>> want to reset the cooldown anyway on job failures (because a job
>>>>>>>>> failure inherently implies a potential rescaling).
>>>>>>>> exactly.
>>>>>>>>
>>>>>>>>
>>>>>>>>> 4) The stabilization time isn't really redundant and serves a
>>>>>>>>> different use-case. The idea behind is that if a users adds multiple
>>>>>>>>> TMs at once then we don't want to rescale immediately at the first
>>>>>>>>> received slot. Without the stabilization time the cooldown would
>>>>>>>>> actually cause bad behavior here, because not only would we rescale
>>>>>>>>> immediately upon receiving the minimum required slots to scale up,
>>>>>>>>> but we also wouldn't use the remaining slots just because the
>>>>>>>>> cooldown says so.
>>>>>>>> I meant the opposite: not having only the cooldown but having only
>>>>>>>> the stabilization time. I must have missed something because what I
>>>>>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>>>>>> every restart entails passing in waiting for resources state, then
>>>>>>>> why introduce a cooldown when there is already at each rescale a
>>>>>>>> stable resource timeout ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Best
>>>>>>>>
>>>>>>>> Etienne
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>>>>>>>>> Hi Robert,
>>>>>>>>>>
>>>>>>>>>> Thanks for your feedback. I don't know the scheduler part well
>>>>>>>>>> enough yet and I'm taking this ticket as a learning workshop.
>>>>>>>>>>
>>>>>>>>>> Regarding your comments:
>>>>>>>>>>
>>>>>>>>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
>>>>>>>>>> configuration from the JobManagerOptions, and also to be consistent
>>>>>>>>>> with other parameters name, I'd suggest
>>>>>>>>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>>>>>>>>
>>>>>>>>>> 2. I thought scaling events existed already and the scheduler
>>>>>>>>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
>>>>>>>>>> is in the Executing state and receives new slots") or in FLIP-138
>>>>>>>>>> (cf "Whenever new slots are available the SlotPool notifies the
>>>>>>>>>> Scheduler"). If it is not the case (it is the scheduler who asks
>>>>>>>>>> for slots), then there is no need for storing scaling requests
>>>>>> indeed.
>>>>>>>>>> => I need a confirmation here
>>>>>>>>>>
>>>>>>>>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
>>>>>>>>>> state and the CoolDownTimer state. So, upon recovery, it would be
>>>>>>>>>> as if there was no ongoing coolDown period. So, a first re-scale
>>>>>>>>>> could happen right away and it will start a coolDown period. A
>>>>>>>>>> second re-scale would have to wait for the end of this period.
>>>>>>>>>>
>>>>>>>>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
>>>>>>>>>> AdaptiveScheduler passes again in the "waiting for resources" state
>>>>>>>>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
>>>>>>>>>> is kind of redundant with the resource-stabilization-timeout. I
>>>>>>>>>> guess it is not the case otherwise the FLINK-21883 ticket would not
>>>>>>>>>> have been created.
>>>>>>>>>>
>>>>>>>>>> => I need a confirmation here also.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks for your views on point 2 and 4.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>>
>>>>>>>>>> Etienne
>>>>>>>>>>
>>>>>>>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>>>>>>>>> Thanks for the FLIP.
>>>>>>>>>>>
>>>>>>>>>>> Some comments:
>>>>>>>>>>> 1. Can you specify the full proposed configuration name? "
>>>>>>>>>>> scaling-cooldown-period" is probably not the full config name?
>>>>>>>>>>> 2. Why is the concept of scaling events and a scaling queue
>>>>>>>>>>> needed? If I
>>>>>>>>>>> remember correctly, the adaptive scheduler will just check how
>>> many
>>>>>>>>>>> TaskManagers are available and then adjust the execution graph
>>>>>>>>>>> accordingly.
>>>>>>>>>>> There's no need to store a number of scaling events. We just need
>>> to
>>>>>>>>>>> determine the time to trigger an adjustment of the execution
>>> graph.
>>>>>>>>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
>>>>>>>>>>> the state
>>>>>>>>>>> of the Adaptive Scheduler?). My proposal would be to just reset
>>> the
>>>>>>>>>>> cooldown period, so after recovery of a JobManager, we have to
>>>>>>>>>>> wait at
>>>>>>>>>>> least for the cooldown period until further scaling operations are
>>>>>>>>>>> done.
>>>>>>>>>>> 4. What's the relationship to the
>>>>>>>>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>>>>>>>>> configuration?
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot for working on this!
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Robert
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>>>>>>>>>>> Chauchot<ec...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> @Yukia,I updated the FLIP to include the aggregation of the
>>> staked
>>>>>>>>>>>> operations that we discussed below PTAL.
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>>
>>>>>>>>>>>> Etienne
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>>>>>>>>> Hi Yuxia,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your feedback. The number of potentially stacked
>>>>>>>>>>>>> operations
>>>>>>>>>>>>> depends on the configured length of the cooldown period.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The proposition in the FLIP is to add a minimum delay between 2
>>>>>>>>>>>>> scaling
>>>>>>>>>>>>> operations. But, indeed, an optimization could be to still stack
>>>>>>>>>>>>> the
>>>>>>>>>>>>> operations (that arrive during a cooldown period) but maybe not
>>>>>>>>>>>>> take
>>>>>>>>>>>>> only the last operation but rather aggregate them in order to
>>>>>>>>>>>>> end up
>>>>>>>>>>>>> with a single aggregated operation when the cooldown period
>>>>>>>>>>>>> ends. For
>>>>>>>>>>>>> example, let's say 3 taskManagers come up and 1 comes down
>>>>>>>>>>>>> during the
>>>>>>>>>>>>> cooldown period, we could generate a single operation of scale
>>>>>>>>>>>>> up +2
>>>>>>>>>>>>> when the period ends.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As a side note regarding your comment on "it'll take a long time
>>>>>> to
>>>>>>>>>>>>> finish all", please keep in mind that the reactive mode (at
>>>>>>>>>>>>> least for
>>>>>>>>>>>>> now) is only available for streaming pipeline which are in
>>> essence
>>>>>>>>>>>>> infinite processing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another side note: when you mention "every taskManagers
>>>>>>>>>>>>> connecting",
>>>>>>>>>>>>> if you are referring to the start of the pipeline, please keep
>>>>>>>>>>>>> in mind
>>>>>>>>>>>>> that the adaptive scheduler has a "waiting for resources"
>>> timeout
>>>>>>>>>>>>> period before starting the pipeline in which all taskmanagers
>>>>>>>>>>>>> connect
>>>>>>>>>>>>> and the parallelism is decided.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best
>>>>>>>>>>>>>
>>>>>>>>>>>>> Etienne
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>>>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about
>>> the
>>>>>>>>>>>>>> mechanism of the cooldown timeout.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    From the Proposed Changes part, if a scalling event is
>>>>>>>>>>>>>> received and
>>>>>>>>>>>>>> it falls during the cooldown period, it'll be stacked to be
>>>>>>>>>>>>>> executed
>>>>>>>>>>>>>> after the period ends. Also, from the description of
>>>>>>>>>>>>>> FLINK-21883[1],
>>>>>>>>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>>>>>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So, is it possible that every taskmanager connecting will
>>>>>>>>>>>>>> produce a
>>>>>>>>>>>>>> scalling event and it'll be stacked with many scale up event
>>>>>> which
>>>>>>>>>>>>>> causes it'll take a long time to finish all? Can we just take
>>> the
>>>>>>>>>>>>>> last one event?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best regards, Yuxia
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>>>>>>>>>>>>>> Chauchot"<ec...@apache.org>
>>>>>>>>>>>>>> 收件人:
>>>>>>>>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<
>>>>>> metrobert@gmail.com>
>>>>>>>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
>>>>>>>>>>>>>> FLIP-322
>>>>>>>>>>>>>> Cooldown
>>>>>>>>>>>>>> period for adaptive scheduler
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>>>>>>>>>>>>>> introduces a
>>>>>>>>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'd like to get your feedback especially @Robert as you opened
>>>>>> the
>>>>>>>>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>>>>>>>>>>>> Best
>>>>>>>>>>>>>> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi all,

Thanks David for your suggestions. Comments inline.

Le 04/07/2023 à 13:35, David Morávek a écrit :
>> waiting 2 min between 2 requirements push seems ok to me
> This depends on the workload. Would you care if the cost of rescaling were
> close to zero (which is for most out-of-the-box workloads)? In that case,
> it would be desirable to rescale more frequently, for example, if TMs join
> incrementally.
>
> Creating a value that covers everything is impossible unless it's
> self-tuning, so I'd prefer having a smooth experience for people trying
> things out (just imagine doing a demo at the conference) and having them
> opt-in for longer cooldowns.
>
The users still have the ability to lower the cooldown period for high 
workloads but we could definitely set a default value to a lower number. 
I agree to favo 
<https://www.linguee.fr/anglais-francais/traduction/favour.html>r lower 
numbers (for smooth rescale experience) and consider higher numbers (for 
high workloads) as exceptions. But we still need to agree on a suitable 
default for most cases: 30s ?
> One idea to keep the timeouts lower while getting more balance would be
> restarting the cooldown period when new resources or requirements are
> received. This would also bring the cooldown's behavior closer to the
> resource-stabilization timeout. Would that make sense?


you mean, if slots are received during the cooldown period instead of 
proposed behavior (A),  do behavior (B) ?

A. schedule a rescale at lastRescale + cooldown point in time

B. schedule a rescale at ** now ** + cooldown point in time

It looks fine to me. It is even better because it avoids having 2 
rescales scheduled at the same time if 2 slots change arrive during the 
same cooldown period.


Etienne


>
>> Depends on how you implement it. If you ignore all of shouldRescale, yes,
> but you shouldn't do that in the first place.


I agree, this is not what I planned to implement.


>
> That sounds great; let's go ahead and outline this in the FLIP.
>
> Best,
> D.
>
>
> On Tue, Jul 4, 2023 at 12:30 PM Etienne Chauchot<ec...@apache.org>
> wrote:
>
>> Hi all,
>>
>> Thanks David for your feedback. My comments are inline
>>
>> Le 04/07/2023 à 09:16, David Morávek a écrit :
>>>> They will struggle if they add new resources and nothing happens for 5
>>> minutes.
>>>
>>> The same applies if they start playing with FLIP-291 APIs. I'm wondering
>> if
>>> the cooldown makes sense there since it was the user's deliberate choice
>> to
>>> push new requirements. 🤔
>>
>> Sure, but remember that the initial rescale is always done immediately.
>> Only the time between 2 rescales is controlled by the cooldown period. I
>> don't see a user adding resources every 10s (your proposed default
>> value) or even with, let's say 2 min, waiting 2 min between 2
>> requirements push seems ok to me.
>>
>>
>>> Best,
>>> D.
>>>
>>> On Tue, Jul 4, 2023 at 9:11 AM David Morávek<dm...@apache.org>   wrote:
>>>
>>>> The FLIP reads sane to me. I'm unsure about the default values, though;
>> 5
>>>> minutes of wait time between rescales feels rather strict, and we should
>>>> rethink it to provide a better out-of-the-box experience.
>>>>
>>>> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
>>>> if they add new resources and nothing happens for 5 minutes. I'd suggest
>>>> defaulting to
>>>> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
>>>> defaults to 10s).
>>
>> If users add resources, the re-scale will happen right away. It is only
>> for next additions that they will have to wait for the coolDown period
>> to end.
>>
>> But anyway, we could lower the default value, I just took what Robert
>> suggested in the ticket.
>>
>>
>>>> I'm still struggling to grasp max internal (force rescale). Ignoring
>> `AdaptiveScheduler#shouldRescale()`
>>>> condition seems rather dangerous. Wouldn't a simple case where you add a
>>>> new TM and remove it before the max interval is reached (so there is
>>>> nothing to do) result in an unnecessary job restart?
>> With current behavior (on master) : adding the TM will result in
>> restarting if the number of slots added leads to job parallelism
>> increase of more than 2. Then removing it can have 2 consequences:
>> either it is removed before the resource-stabilisation timeout and there
>> will be no restart. Or it is removed after this timeout (the job is in
>> Running state) and it will entail another restart and parallelism decrease.
>>
>> With the proposed behavior: what the scaling-interval.max will change is
>> only on the resource addition part: when the TM is added, if the time
>> since last rescale > scaling-interval.max, then a restart and
>> parallelism increase will be done even if it leads to a parallelism
>> increase < 2. The rest regarding TM removal does not change.
>>
>> => So, the real difference with the current behavior is ** if the slots
>> addition was too little ** : in the current behavior nothing happens. In
>> the new behavior nothing happens unless the addition arrives after
>> scaling-interval.max.
>>
>>
>> Best
>>
>> Etienne
>>
>>>> Best,
>>>> D.
>>>>
>>>> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot<ec...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
>>>>> vote thread.
>>>>>
>>>>> Best
>>>>>
>>>>> Etienne
>>>>>
>>>>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>>>>>>> we should schedule a check that will rescale if
>>>>>> min-parallelism-increase is met. Then, what it the use of
>>>>>> scaling-interval.max timeout in that context ?
>>>>>>
>>>>>> To force a rescale if min-parallelism-increase is not met (but we
>>>>>> could still run above the current parallelism).
>>>>>>
>>>>>> min-parallelism-increase is a trade-off between the cost of rescaling
>>>>>> vs the performance benefit of the parallelism increase. Over time the
>>>>>> balance tips more and more in favor of the parallelism increase, hence
>>>>>> we should eventually rescale anyway even if the minimum isn't met, or
>>>>>> at least give users the option to do so.
>>>>>>
>>>>>>> I meant the opposite: not having only the cooldown but having only
>>>>>> the stabilization time. I must have missed something because what I
>>>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>>>> every restart entails passing in waiting for resources state, then why
>>>>>> introduce a cooldown when there is already at each rescale a stable
>>>>>> resource timeout ?
>>>>>>
>>>>>> It is technically correct that the stable resource timeout can be used
>>>>>> to limit the number of rescale operations per interval, however during
>>>>>> that time the job isn't running, in contrast to the cooldown.
>>>>>>
>>>>>> Having both just gives you a lot more flexibility.
>>>>>> "I want at most 1 rescale operation per hour, and wait at most 1
>>>>>> minute for resource to stabilize when a rescale happens".
>>>>>> You can't express this with only one of the options.
>>>>>>
>>>>>> On 20/06/2023 14:41, Etienne Chauchot wrote:
>>>>>>> Hi Chesnay,
>>>>>>>
>>>>>>> Thanks for your feedback. Comments inline
>>>>>>>
>>>>>>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>>>>>>>> 1) Options specific to the adaptive scheduler should start with
>>>>>>>> "jobmanager.adaptive-scheduler".
>>>>>>> ok
>>>>>>>
>>>>>>>
>>>>>>>> 2)
>>>>>>>> There isn't /really /a notion of a "scaling event". The scheduler is
>>>>>>>> informed about new/lost slots and job failures, and reacts
>>>>>>>> accordingly by maybe rescaling the job.
>>>>>>>> (sure, you can think of these as events, but you can think of
>>>>>>>> practically everything as events)
>>>>>>>>
>>>>>>>> There shouldn't be a queue for events. All the scheduler should have
>>>>>>>> to know is that the next rescale check is scheduled for time T,
>>>>>>>> which in practice boils down to a flag and a scheduled action that
>>>>>>>> runs Executing#maybeRescale.
>>>>>>> Makes total sense, its very simple like this. Thanks for the
>>>>>>> precision and pointer. After the related FLIPs, I'll look at the code
>>>>>>> now.
>>>>>>>
>>>>>>>
>>>>>>>> With that in mind, we also have to look at how we keep this state
>>>>>>>> around. Presumably it is scoped to the current state, such that the
>>>>>>>> cooldown is reset if a job fails.
>>>>>>>> Maybe we should add a separate ExecutingWithCooldown state; not sure
>>>>>>>> yet.
>>>>>>> Yes loosing cooldown state and cooldown reset upon failure is what I
>>>>>>> suggested in point 3 in previous email. Not sure either for a new
>>>>>>> state, I'll figure it out after experimenting with the code. I'll
>>>>>>> update the FLIP then.
>>>>>>>
>>>>>>>
>>>>>>>> It would be good to clarify whether this FLIP only attempts to cover
>>>>>>>> scale up operations, or also scale downs in case of slot losses.
>>>>>>> When there are slots loss, most of the time it is due to a TM loss so
>>>>>>> there should be several slots lost at the same time but (hopefully)
>>>>>>> only once. There should not be many scale downs in a row (but still
>>>>>>> cascading failures can happen). I think, we should just protect
>>>>>>> against having scale ups immediately following. For that, I think we
>>>>>>> could just keep the current behavior of transitioning to Restarting
>>>>>>> state and then back to Waiting for Resources state. This state will
>>>>>>> protect us against scale ups immediately following failure/restart.
>>>>>>>
>>>>>>>
>>>>>>>> We should also think about how it relates to the externalized
>>>>>>>> declarative resource management. Should we always rescale
>>>>>>>> immediately? Should we wait until the cooldown is over?
>>>>>>> It relates to point 2, no ? we should rescale immediately only if
>>>>>>> last rescale was done more than scaling-interval.min ago otherwise
>>>>>>> schedule a rescale at last-rescale + scaling-interval.min time.
>>>>>>>
>>>>>>>
>>>>>>>> Related to this, there's the min-parallelism-increase option, that
>>>>>>>> if for example set to "2" restricts rescale operations to only occur
>>>>>>>> if the parallelism increases by at least 2.
>>>>>>> yes I saw that in the code
>>>>>>>
>>>>>>>
>>>>>>>> Ideally however there would be a max timeout for this.
>>>>>>>>
>>>>>>>> As such we could maybe think about this a bit differently:
>>>>>>>> Add 2 new options instead of 1:
>>>>>>>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
>>>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>>>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
>>>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>>> At point 2, we said that when slots change (requirements change or
>>>>>>> new slots available), if last rescale check (call to maybeRescale)
>>>>>>> was done less than scaling-interval.min ago, we should schedule a
>>>>>>> check that will rescale if min-parallelism-increase is met. Then,
>>>>>>> what it the use of scaling-interval.max timeout in that context ?
>>>>>>>
>>>>>>>
>>>>>>>> 3) It sounds fine that we lose the cooldown state, because imo we
>>>>>>>> want to reset the cooldown anyway on job failures (because a job
>>>>>>>> failure inherently implies a potential rescaling).
>>>>>>> exactly.
>>>>>>>
>>>>>>>
>>>>>>>> 4) The stabilization time isn't really redundant and serves a
>>>>>>>> different use-case. The idea behind is that if a users adds multiple
>>>>>>>> TMs at once then we don't want to rescale immediately at the first
>>>>>>>> received slot. Without the stabilization time the cooldown would
>>>>>>>> actually cause bad behavior here, because not only would we rescale
>>>>>>>> immediately upon receiving the minimum required slots to scale up,
>>>>>>>> but we also wouldn't use the remaining slots just because the
>>>>>>>> cooldown says so.
>>>>>>> I meant the opposite: not having only the cooldown but having only
>>>>>>> the stabilization time. I must have missed something because what I
>>>>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>>>>> every restart entails passing in waiting for resources state, then
>>>>>>> why introduce a cooldown when there is already at each rescale a
>>>>>>> stable resource timeout ?
>>>>>>>
>>>>>>>
>>>>>>> Best
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>>>>>>>> Hi Robert,
>>>>>>>>>
>>>>>>>>> Thanks for your feedback. I don't know the scheduler part well
>>>>>>>>> enough yet and I'm taking this ticket as a learning workshop.
>>>>>>>>>
>>>>>>>>> Regarding your comments:
>>>>>>>>>
>>>>>>>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
>>>>>>>>> configuration from the JobManagerOptions, and also to be consistent
>>>>>>>>> with other parameters name, I'd suggest
>>>>>>>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>>>>>>>
>>>>>>>>> 2. I thought scaling events existed already and the scheduler
>>>>>>>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
>>>>>>>>> is in the Executing state and receives new slots") or in FLIP-138
>>>>>>>>> (cf "Whenever new slots are available the SlotPool notifies the
>>>>>>>>> Scheduler"). If it is not the case (it is the scheduler who asks
>>>>>>>>> for slots), then there is no need for storing scaling requests
>>>>> indeed.
>>>>>>>>> => I need a confirmation here
>>>>>>>>>
>>>>>>>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
>>>>>>>>> state and the CoolDownTimer state. So, upon recovery, it would be
>>>>>>>>> as if there was no ongoing coolDown period. So, a first re-scale
>>>>>>>>> could happen right away and it will start a coolDown period. A
>>>>>>>>> second re-scale would have to wait for the end of this period.
>>>>>>>>>
>>>>>>>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
>>>>>>>>> AdaptiveScheduler passes again in the "waiting for resources" state
>>>>>>>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
>>>>>>>>> is kind of redundant with the resource-stabilization-timeout. I
>>>>>>>>> guess it is not the case otherwise the FLINK-21883 ticket would not
>>>>>>>>> have been created.
>>>>>>>>>
>>>>>>>>> => I need a confirmation here also.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks for your views on point 2 and 4.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>>
>>>>>>>>> Etienne
>>>>>>>>>
>>>>>>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>>>>>>>> Thanks for the FLIP.
>>>>>>>>>>
>>>>>>>>>> Some comments:
>>>>>>>>>> 1. Can you specify the full proposed configuration name? "
>>>>>>>>>> scaling-cooldown-period" is probably not the full config name?
>>>>>>>>>> 2. Why is the concept of scaling events and a scaling queue
>>>>>>>>>> needed? If I
>>>>>>>>>> remember correctly, the adaptive scheduler will just check how
>> many
>>>>>>>>>> TaskManagers are available and then adjust the execution graph
>>>>>>>>>> accordingly.
>>>>>>>>>> There's no need to store a number of scaling events. We just need
>> to
>>>>>>>>>> determine the time to trigger an adjustment of the execution
>> graph.
>>>>>>>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
>>>>>>>>>> the state
>>>>>>>>>> of the Adaptive Scheduler?). My proposal would be to just reset
>> the
>>>>>>>>>> cooldown period, so after recovery of a JobManager, we have to
>>>>>>>>>> wait at
>>>>>>>>>> least for the cooldown period until further scaling operations are
>>>>>>>>>> done.
>>>>>>>>>> 4. What's the relationship to the
>>>>>>>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>>>>>>>> configuration?
>>>>>>>>>>
>>>>>>>>>> Thanks a lot for working on this!
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Robert
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>>>>>>>>>> Chauchot<ec...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> @Yukia,I updated the FLIP to include the aggregation of the
>> staked
>>>>>>>>>>> operations that we discussed below PTAL.
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>>
>>>>>>>>>>> Etienne
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>>>>>>>> Hi Yuxia,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your feedback. The number of potentially stacked
>>>>>>>>>>>> operations
>>>>>>>>>>>> depends on the configured length of the cooldown period.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The proposition in the FLIP is to add a minimum delay between 2
>>>>>>>>>>>> scaling
>>>>>>>>>>>> operations. But, indeed, an optimization could be to still stack
>>>>>>>>>>>> the
>>>>>>>>>>>> operations (that arrive during a cooldown period) but maybe not
>>>>>>>>>>>> take
>>>>>>>>>>>> only the last operation but rather aggregate them in order to
>>>>>>>>>>>> end up
>>>>>>>>>>>> with a single aggregated operation when the cooldown period
>>>>>>>>>>>> ends. For
>>>>>>>>>>>> example, let's say 3 taskManagers come up and 1 comes down
>>>>>>>>>>>> during the
>>>>>>>>>>>> cooldown period, we could generate a single operation of scale
>>>>>>>>>>>> up +2
>>>>>>>>>>>> when the period ends.
>>>>>>>>>>>>
>>>>>>>>>>>> As a side note regarding your comment on "it'll take a long time
>>>>> to
>>>>>>>>>>>> finish all", please keep in mind that the reactive mode (at
>>>>>>>>>>>> least for
>>>>>>>>>>>> now) is only available for streaming pipeline which are in
>> essence
>>>>>>>>>>>> infinite processing.
>>>>>>>>>>>>
>>>>>>>>>>>> Another side note: when you mention "every taskManagers
>>>>>>>>>>>> connecting",
>>>>>>>>>>>> if you are referring to the start of the pipeline, please keep
>>>>>>>>>>>> in mind
>>>>>>>>>>>> that the adaptive scheduler has a "waiting for resources"
>> timeout
>>>>>>>>>>>> period before starting the pipeline in which all taskmanagers
>>>>>>>>>>>> connect
>>>>>>>>>>>> and the parallelism is decided.
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>>
>>>>>>>>>>>> Etienne
>>>>>>>>>>>>
>>>>>>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about
>> the
>>>>>>>>>>>>> mechanism of the cooldown timeout.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    From the Proposed Changes part, if a scalling event is
>>>>>>>>>>>>> received and
>>>>>>>>>>>>> it falls during the cooldown period, it'll be stacked to be
>>>>>>>>>>>>> executed
>>>>>>>>>>>>> after the period ends. Also, from the description of
>>>>>>>>>>>>> FLINK-21883[1],
>>>>>>>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>>>>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>>>>>>>>
>>>>>>>>>>>>> So, is it possible that every taskmanager connecting will
>>>>>>>>>>>>> produce a
>>>>>>>>>>>>> scalling event and it'll be stacked with many scale up event
>>>>> which
>>>>>>>>>>>>> causes it'll take a long time to finish all? Can we just take
>> the
>>>>>>>>>>>>> last one event?
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best regards, Yuxia
>>>>>>>>>>>>>
>>>>>>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>>>>>>>>>>>>> Chauchot"<ec...@apache.org>
>>>>>>>>>>>>> 收件人:
>>>>>>>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<
>>>>> metrobert@gmail.com>
>>>>>>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
>>>>>>>>>>>>> FLIP-322
>>>>>>>>>>>>> Cooldown
>>>>>>>>>>>>> period for adaptive scheduler
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>>>>>>>>>>>>> introduces a
>>>>>>>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'd like to get your feedback especially @Robert as you opened
>>>>> the
>>>>>>>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>>>>>>>>>>> Best
>>>>>>>>>>>>> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by David Morávek <dm...@apache.org>.
> waiting 2 min between 2 requirements push seems ok to me

This depends on the workload. Would you care if the cost of rescaling were
close to zero (which is for most out-of-the-box workloads)? In that case,
it would be desirable to rescale more frequently, for example, if TMs join
incrementally.

Creating a value that covers everything is impossible unless it's
self-tuning, so I'd prefer having a smooth experience for people trying
things out (just imagine doing a demo at the conference) and having them
opt-in for longer cooldowns.


One idea to keep the timeouts lower while getting more balance would be
restarting the cooldown period when new resources or requirements are
received. This would also bring the cooldown's behavior closer to the
resource-stabilization timeout. Would that make sense?

> Depends on how you implement it. If you ignore all of shouldRescale, yes,
but you shouldn't do that in the first place.

That sounds great; let's go ahead and outline this in the FLIP.

Best,
D.


On Tue, Jul 4, 2023 at 12:30 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi all,
>
> Thanks David for your feedback. My comments are inline
>
> Le 04/07/2023 à 09:16, David Morávek a écrit :
> >> They will struggle if they add new resources and nothing happens for 5
> > minutes.
> >
> > The same applies if they start playing with FLIP-291 APIs. I'm wondering
> if
> > the cooldown makes sense there since it was the user's deliberate choice
> to
> > push new requirements. 🤔
>
>
> Sure, but remember that the initial rescale is always done immediately.
> Only the time between 2 rescales is controlled by the cooldown period. I
> don't see a user adding resources every 10s (your proposed default
> value) or even with, let's say 2 min, waiting 2 min between 2
> requirements push seems ok to me.
>
>
> >
> > Best,
> > D.
> >
> > On Tue, Jul 4, 2023 at 9:11 AM David Morávek<dm...@apache.org>  wrote:
> >
> >> The FLIP reads sane to me. I'm unsure about the default values, though;
> 5
> >> minutes of wait time between rescales feels rather strict, and we should
> >> rethink it to provide a better out-of-the-box experience.
> >>
> >> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
> >> if they add new resources and nothing happens for 5 minutes. I'd suggest
> >> defaulting to
> >> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
> >> defaults to 10s).
>
>
> If users add resources, the re-scale will happen right away. It is only
> for next additions that they will have to wait for the coolDown period
> to end.
>
> But anyway, we could lower the default value, I just took what Robert
> suggested in the ticket.
>
>
> >>
> >> I'm still struggling to grasp max internal (force rescale). Ignoring
> `AdaptiveScheduler#shouldRescale()`
> >> condition seems rather dangerous. Wouldn't a simple case where you add a
> >> new TM and remove it before the max interval is reached (so there is
> >> nothing to do) result in an unnecessary job restart?
>
> With current behavior (on master) : adding the TM will result in
> restarting if the number of slots added leads to job parallelism
> increase of more than 2. Then removing it can have 2 consequences:
> either it is removed before the resource-stabilisation timeout and there
> will be no restart. Or it is removed after this timeout (the job is in
> Running state) and it will entail another restart and parallelism decrease.
>
> With the proposed behavior: what the scaling-interval.max will change is
> only on the resource addition part: when the TM is added, if the time
> since last rescale > scaling-interval.max, then a restart and
> parallelism increase will be done even if it leads to a parallelism
> increase < 2. The rest regarding TM removal does not change.
>
> => So, the real difference with the current behavior is ** if the slots
> addition was too little ** : in the current behavior nothing happens. In
> the new behavior nothing happens unless the addition arrives after
> scaling-interval.max.
>
>
> Best
>
> Etienne
>
> >>
> >> Best,
> >> D.
> >>
> >> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot<ec...@apache.org>
> >> wrote:
> >>
> >>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
> >>> vote thread.
> >>>
> >>> Best
> >>>
> >>> Etienne
> >>>
> >>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
> >>>>> we should schedule a check that will rescale if
> >>>> min-parallelism-increase is met. Then, what it the use of
> >>>> scaling-interval.max timeout in that context ?
> >>>>
> >>>> To force a rescale if min-parallelism-increase is not met (but we
> >>>> could still run above the current parallelism).
> >>>>
> >>>> min-parallelism-increase is a trade-off between the cost of rescaling
> >>>> vs the performance benefit of the parallelism increase. Over time the
> >>>> balance tips more and more in favor of the parallelism increase, hence
> >>>> we should eventually rescale anyway even if the minimum isn't met, or
> >>>> at least give users the option to do so.
> >>>>
> >>>>> I meant the opposite: not having only the cooldown but having only
> >>>> the stabilization time. I must have missed something because what I
> >>>> wonder is: if every rescale entails a restart of the pipeline and
> >>>> every restart entails passing in waiting for resources state, then why
> >>>> introduce a cooldown when there is already at each rescale a stable
> >>>> resource timeout ?
> >>>>
> >>>> It is technically correct that the stable resource timeout can be used
> >>>> to limit the number of rescale operations per interval, however during
> >>>> that time the job isn't running, in contrast to the cooldown.
> >>>>
> >>>> Having both just gives you a lot more flexibility.
> >>>> "I want at most 1 rescale operation per hour, and wait at most 1
> >>>> minute for resource to stabilize when a rescale happens".
> >>>> You can't express this with only one of the options.
> >>>>
> >>>> On 20/06/2023 14:41, Etienne Chauchot wrote:
> >>>>> Hi Chesnay,
> >>>>>
> >>>>> Thanks for your feedback. Comments inline
> >>>>>
> >>>>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
> >>>>>> 1) Options specific to the adaptive scheduler should start with
> >>>>>> "jobmanager.adaptive-scheduler".
> >>>>>
> >>>>> ok
> >>>>>
> >>>>>
> >>>>>> 2)
> >>>>>> There isn't /really /a notion of a "scaling event". The scheduler is
> >>>>>> informed about new/lost slots and job failures, and reacts
> >>>>>> accordingly by maybe rescaling the job.
> >>>>>> (sure, you can think of these as events, but you can think of
> >>>>>> practically everything as events)
> >>>>>>
> >>>>>> There shouldn't be a queue for events. All the scheduler should have
> >>>>>> to know is that the next rescale check is scheduled for time T,
> >>>>>> which in practice boils down to a flag and a scheduled action that
> >>>>>> runs Executing#maybeRescale.
> >>>>>
> >>>>> Makes total sense, its very simple like this. Thanks for the
> >>>>> precision and pointer. After the related FLIPs, I'll look at the code
> >>>>> now.
> >>>>>
> >>>>>
> >>>>>> With that in mind, we also have to look at how we keep this state
> >>>>>> around. Presumably it is scoped to the current state, such that the
> >>>>>> cooldown is reset if a job fails.
> >>>>>> Maybe we should add a separate ExecutingWithCooldown state; not sure
> >>>>>> yet.
> >>>>>
> >>>>> Yes loosing cooldown state and cooldown reset upon failure is what I
> >>>>> suggested in point 3 in previous email. Not sure either for a new
> >>>>> state, I'll figure it out after experimenting with the code. I'll
> >>>>> update the FLIP then.
> >>>>>
> >>>>>
> >>>>>> It would be good to clarify whether this FLIP only attempts to cover
> >>>>>> scale up operations, or also scale downs in case of slot losses.
> >>>>>
> >>>>> When there are slots loss, most of the time it is due to a TM loss so
> >>>>> there should be several slots lost at the same time but (hopefully)
> >>>>> only once. There should not be many scale downs in a row (but still
> >>>>> cascading failures can happen). I think, we should just protect
> >>>>> against having scale ups immediately following. For that, I think we
> >>>>> could just keep the current behavior of transitioning to Restarting
> >>>>> state and then back to Waiting for Resources state. This state will
> >>>>> protect us against scale ups immediately following failure/restart.
> >>>>>
> >>>>>
> >>>>>> We should also think about how it relates to the externalized
> >>>>>> declarative resource management. Should we always rescale
> >>>>>> immediately? Should we wait until the cooldown is over?
> >>>>>
> >>>>> It relates to point 2, no ? we should rescale immediately only if
> >>>>> last rescale was done more than scaling-interval.min ago otherwise
> >>>>> schedule a rescale at last-rescale + scaling-interval.min time.
> >>>>>
> >>>>>
> >>>>>> Related to this, there's the min-parallelism-increase option, that
> >>>>>> if for example set to "2" restricts rescale operations to only occur
> >>>>>> if the parallelism increases by at least 2.
> >>>>>
> >>>>> yes I saw that in the code
> >>>>>
> >>>>>
> >>>>>> Ideally however there would be a max timeout for this.
> >>>>>>
> >>>>>> As such we could maybe think about this a bit differently:
> >>>>>> Add 2 new options instead of 1:
> >>>>>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
> >>>>>> the scheduler will wait for the next effective rescale operations.
> >>>>>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
> >>>>>> the scheduler will wait for the next effective rescale operations.
> >>>>>
> >>>>> At point 2, we said that when slots change (requirements change or
> >>>>> new slots available), if last rescale check (call to maybeRescale)
> >>>>> was done less than scaling-interval.min ago, we should schedule a
> >>>>> check that will rescale if min-parallelism-increase is met. Then,
> >>>>> what it the use of scaling-interval.max timeout in that context ?
> >>>>>
> >>>>>
> >>>>>> 3) It sounds fine that we lose the cooldown state, because imo we
> >>>>>> want to reset the cooldown anyway on job failures (because a job
> >>>>>> failure inherently implies a potential rescaling).
> >>>>>
> >>>>> exactly.
> >>>>>
> >>>>>
> >>>>>> 4) The stabilization time isn't really redundant and serves a
> >>>>>> different use-case. The idea behind is that if a users adds multiple
> >>>>>> TMs at once then we don't want to rescale immediately at the first
> >>>>>> received slot. Without the stabilization time the cooldown would
> >>>>>> actually cause bad behavior here, because not only would we rescale
> >>>>>> immediately upon receiving the minimum required slots to scale up,
> >>>>>> but we also wouldn't use the remaining slots just because the
> >>>>>> cooldown says so.
> >>>>>
> >>>>> I meant the opposite: not having only the cooldown but having only
> >>>>> the stabilization time. I must have missed something because what I
> >>>>> wonder is: if every rescale entails a restart of the pipeline and
> >>>>> every restart entails passing in waiting for resources state, then
> >>>>> why introduce a cooldown when there is already at each rescale a
> >>>>> stable resource timeout ?
> >>>>>
> >>>>>
> >>>>> Best
> >>>>>
> >>>>> Etienne
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
> >>>>>>> Hi Robert,
> >>>>>>>
> >>>>>>> Thanks for your feedback. I don't know the scheduler part well
> >>>>>>> enough yet and I'm taking this ticket as a learning workshop.
> >>>>>>>
> >>>>>>> Regarding your comments:
> >>>>>>>
> >>>>>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
> >>>>>>> configuration from the JobManagerOptions, and also to be consistent
> >>>>>>> with other parameters name, I'd suggest
> >>>>>>> /jobmanager.scheduler-scaling-cooldown-period/
> >>>>>>>
> >>>>>>> 2. I thought scaling events existed already and the scheduler
> >>>>>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
> >>>>>>> is in the Executing state and receives new slots") or in FLIP-138
> >>>>>>> (cf "Whenever new slots are available the SlotPool notifies the
> >>>>>>> Scheduler"). If it is not the case (it is the scheduler who asks
> >>>>>>> for slots), then there is no need for storing scaling requests
> >>> indeed.
> >>>>>>> => I need a confirmation here
> >>>>>>>
> >>>>>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
> >>>>>>> state and the CoolDownTimer state. So, upon recovery, it would be
> >>>>>>> as if there was no ongoing coolDown period. So, a first re-scale
> >>>>>>> could happen right away and it will start a coolDown period. A
> >>>>>>> second re-scale would have to wait for the end of this period.
> >>>>>>>
> >>>>>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
> >>>>>>> AdaptiveScheduler passes again in the "waiting for resources" state
> >>>>>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
> >>>>>>> is kind of redundant with the resource-stabilization-timeout. I
> >>>>>>> guess it is not the case otherwise the FLINK-21883 ticket would not
> >>>>>>> have been created.
> >>>>>>>
> >>>>>>> => I need a confirmation here also.
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks for your views on point 2 and 4.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best
> >>>>>>>
> >>>>>>> Etienne
> >>>>>>>
> >>>>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
> >>>>>>>> Thanks for the FLIP.
> >>>>>>>>
> >>>>>>>> Some comments:
> >>>>>>>> 1. Can you specify the full proposed configuration name? "
> >>>>>>>> scaling-cooldown-period" is probably not the full config name?
> >>>>>>>> 2. Why is the concept of scaling events and a scaling queue
> >>>>>>>> needed? If I
> >>>>>>>> remember correctly, the adaptive scheduler will just check how
> many
> >>>>>>>> TaskManagers are available and then adjust the execution graph
> >>>>>>>> accordingly.
> >>>>>>>> There's no need to store a number of scaling events. We just need
> to
> >>>>>>>> determine the time to trigger an adjustment of the execution
> graph.
> >>>>>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
> >>>>>>>> the state
> >>>>>>>> of the Adaptive Scheduler?). My proposal would be to just reset
> the
> >>>>>>>> cooldown period, so after recovery of a JobManager, we have to
> >>>>>>>> wait at
> >>>>>>>> least for the cooldown period until further scaling operations are
> >>>>>>>> done.
> >>>>>>>> 4. What's the relationship to the
> >>>>>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
> >>>>>>>> configuration?
> >>>>>>>>
> >>>>>>>> Thanks a lot for working on this!
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Robert
> >>>>>>>>
> >>>>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
> >>>>>>>> Chauchot<ec...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> @Yukia,I updated the FLIP to include the aggregation of the
> staked
> >>>>>>>>> operations that we discussed below PTAL.
> >>>>>>>>>
> >>>>>>>>> Best
> >>>>>>>>>
> >>>>>>>>> Etienne
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
> >>>>>>>>>> Hi Yuxia,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your feedback. The number of potentially stacked
> >>>>>>>>>> operations
> >>>>>>>>>> depends on the configured length of the cooldown period.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> The proposition in the FLIP is to add a minimum delay between 2
> >>>>>>>>>> scaling
> >>>>>>>>>> operations. But, indeed, an optimization could be to still stack
> >>>>>>>>>> the
> >>>>>>>>>> operations (that arrive during a cooldown period) but maybe not
> >>>>>>>>>> take
> >>>>>>>>>> only the last operation but rather aggregate them in order to
> >>>>>>>>>> end up
> >>>>>>>>>> with a single aggregated operation when the cooldown period
> >>>>>>>>>> ends. For
> >>>>>>>>>> example, let's say 3 taskManagers come up and 1 comes down
> >>>>>>>>>> during the
> >>>>>>>>>> cooldown period, we could generate a single operation of scale
> >>>>>>>>>> up +2
> >>>>>>>>>> when the period ends.
> >>>>>>>>>>
> >>>>>>>>>> As a side note regarding your comment on "it'll take a long time
> >>> to
> >>>>>>>>>> finish all", please keep in mind that the reactive mode (at
> >>>>>>>>>> least for
> >>>>>>>>>> now) is only available for streaming pipeline which are in
> essence
> >>>>>>>>>> infinite processing.
> >>>>>>>>>>
> >>>>>>>>>> Another side note: when you mention "every taskManagers
> >>>>>>>>>> connecting",
> >>>>>>>>>> if you are referring to the start of the pipeline, please keep
> >>>>>>>>>> in mind
> >>>>>>>>>> that the adaptive scheduler has a "waiting for resources"
> timeout
> >>>>>>>>>> period before starting the pipeline in which all taskmanagers
> >>>>>>>>>> connect
> >>>>>>>>>> and the parallelism is decided.
> >>>>>>>>>>
> >>>>>>>>>> Best
> >>>>>>>>>>
> >>>>>>>>>> Etienne
> >>>>>>>>>>
> >>>>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
> >>>>>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about
> the
> >>>>>>>>>>> mechanism of the cooldown timeout.
> >>>>>>>>>>>
> >>>>>>>>>>>   From the Proposed Changes part, if a scalling event is
> >>>>>>>>>>> received and
> >>>>>>>>>>> it falls during the cooldown period, it'll be stacked to be
> >>>>>>>>>>> executed
> >>>>>>>>>>> after the period ends. Also, from the description of
> >>>>>>>>>>> FLINK-21883[1],
> >>>>>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
> >>>>>>>>>>> because TaskManagers are not all connecting at the same time.
> >>>>>>>>>>>
> >>>>>>>>>>> So, is it possible that every taskmanager connecting will
> >>>>>>>>>>> produce a
> >>>>>>>>>>> scalling event and it'll be stacked with many scale up event
> >>> which
> >>>>>>>>>>> causes it'll take a long time to finish all? Can we just take
> the
> >>>>>>>>>>> last one event?
> >>>>>>>>>>>
> >>>>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
> >>>>>>>>>>>
> >>>>>>>>>>> Best regards, Yuxia
> >>>>>>>>>>>
> >>>>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
> >>>>>>>>>>> Chauchot"<ec...@apache.org>
> >>>>>>>>>>> 收件人:
> >>>>>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<
> >>> metrobert@gmail.com>
> >>>>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
> >>>>>>>>>>> FLIP-322
> >>>>>>>>>>> Cooldown
> >>>>>>>>>>> period for adaptive scheduler
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
> >>>>>>>>>>> introduces a
> >>>>>>>>>>> cooldown period for the adaptive scheduler.
> >>>>>>>>>>>
> >>>>>>>>>>> I'd like to get your feedback especially @Robert as you opened
> >>> the
> >>>>>>>>>>> related ticket and worked on the reactive mode a lot.
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
> >>>>>>>>>> Best
> >>>>>>>>>>> Etienne
> >>>>>>
> >>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi all,

Thanks David for your feedback. My comments are inline

Le 04/07/2023 à 09:16, David Morávek a écrit :
>> They will struggle if they add new resources and nothing happens for 5
> minutes.
>
> The same applies if they start playing with FLIP-291 APIs. I'm wondering if
> the cooldown makes sense there since it was the user's deliberate choice to
> push new requirements. 🤔


Sure, but remember that the initial rescale is always done immediately. 
Only the time between 2 rescales is controlled by the cooldown period. I 
don't see a user adding resources every 10s (your proposed default 
value) or even with, let's say 2 min, waiting 2 min between 2 
requirements push seems ok to me.


>
> Best,
> D.
>
> On Tue, Jul 4, 2023 at 9:11 AM David Morávek<dm...@apache.org>  wrote:
>
>> The FLIP reads sane to me. I'm unsure about the default values, though; 5
>> minutes of wait time between rescales feels rather strict, and we should
>> rethink it to provide a better out-of-the-box experience.
>>
>> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
>> if they add new resources and nothing happens for 5 minutes. I'd suggest
>> defaulting to
>> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
>> defaults to 10s).


If users add resources, the re-scale will happen right away. It is only 
for next additions that they will have to wait for the coolDown period 
to end.

But anyway, we could lower the default value, I just took what Robert 
suggested in the ticket.


>>
>> I'm still struggling to grasp max internal (force rescale). Ignoring `AdaptiveScheduler#shouldRescale()`
>> condition seems rather dangerous. Wouldn't a simple case where you add a
>> new TM and remove it before the max interval is reached (so there is
>> nothing to do) result in an unnecessary job restart?

With current behavior (on master) : adding the TM will result in 
restarting if the number of slots added leads to job parallelism 
increase of more than 2. Then removing it can have 2 consequences: 
either it is removed before the resource-stabilisation timeout and there 
will be no restart. Or it is removed after this timeout (the job is in 
Running state) and it will entail another restart and parallelism decrease.

With the proposed behavior: what the scaling-interval.max will change is 
only on the resource addition part: when the TM is added, if the time 
since last rescale > scaling-interval.max, then a restart and 
parallelism increase will be done even if it leads to a parallelism 
increase < 2. The rest regarding TM removal does not change.

=> So, the real difference with the current behavior is ** if the slots 
addition was too little ** : in the current behavior nothing happens. In 
the new behavior nothing happens unless the addition arrives after 
scaling-interval.max.


Best

Etienne

>>
>> Best,
>> D.
>>
>> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot<ec...@apache.org>
>> wrote:
>>
>>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
>>> vote thread.
>>>
>>> Best
>>>
>>> Etienne
>>>
>>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>>>>> we should schedule a check that will rescale if
>>>> min-parallelism-increase is met. Then, what it the use of
>>>> scaling-interval.max timeout in that context ?
>>>>
>>>> To force a rescale if min-parallelism-increase is not met (but we
>>>> could still run above the current parallelism).
>>>>
>>>> min-parallelism-increase is a trade-off between the cost of rescaling
>>>> vs the performance benefit of the parallelism increase. Over time the
>>>> balance tips more and more in favor of the parallelism increase, hence
>>>> we should eventually rescale anyway even if the minimum isn't met, or
>>>> at least give users the option to do so.
>>>>
>>>>> I meant the opposite: not having only the cooldown but having only
>>>> the stabilization time. I must have missed something because what I
>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>> every restart entails passing in waiting for resources state, then why
>>>> introduce a cooldown when there is already at each rescale a stable
>>>> resource timeout ?
>>>>
>>>> It is technically correct that the stable resource timeout can be used
>>>> to limit the number of rescale operations per interval, however during
>>>> that time the job isn't running, in contrast to the cooldown.
>>>>
>>>> Having both just gives you a lot more flexibility.
>>>> "I want at most 1 rescale operation per hour, and wait at most 1
>>>> minute for resource to stabilize when a rescale happens".
>>>> You can't express this with only one of the options.
>>>>
>>>> On 20/06/2023 14:41, Etienne Chauchot wrote:
>>>>> Hi Chesnay,
>>>>>
>>>>> Thanks for your feedback. Comments inline
>>>>>
>>>>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>>>>>> 1) Options specific to the adaptive scheduler should start with
>>>>>> "jobmanager.adaptive-scheduler".
>>>>>
>>>>> ok
>>>>>
>>>>>
>>>>>> 2)
>>>>>> There isn't /really /a notion of a "scaling event". The scheduler is
>>>>>> informed about new/lost slots and job failures, and reacts
>>>>>> accordingly by maybe rescaling the job.
>>>>>> (sure, you can think of these as events, but you can think of
>>>>>> practically everything as events)
>>>>>>
>>>>>> There shouldn't be a queue for events. All the scheduler should have
>>>>>> to know is that the next rescale check is scheduled for time T,
>>>>>> which in practice boils down to a flag and a scheduled action that
>>>>>> runs Executing#maybeRescale.
>>>>>
>>>>> Makes total sense, its very simple like this. Thanks for the
>>>>> precision and pointer. After the related FLIPs, I'll look at the code
>>>>> now.
>>>>>
>>>>>
>>>>>> With that in mind, we also have to look at how we keep this state
>>>>>> around. Presumably it is scoped to the current state, such that the
>>>>>> cooldown is reset if a job fails.
>>>>>> Maybe we should add a separate ExecutingWithCooldown state; not sure
>>>>>> yet.
>>>>>
>>>>> Yes loosing cooldown state and cooldown reset upon failure is what I
>>>>> suggested in point 3 in previous email. Not sure either for a new
>>>>> state, I'll figure it out after experimenting with the code. I'll
>>>>> update the FLIP then.
>>>>>
>>>>>
>>>>>> It would be good to clarify whether this FLIP only attempts to cover
>>>>>> scale up operations, or also scale downs in case of slot losses.
>>>>>
>>>>> When there are slots loss, most of the time it is due to a TM loss so
>>>>> there should be several slots lost at the same time but (hopefully)
>>>>> only once. There should not be many scale downs in a row (but still
>>>>> cascading failures can happen). I think, we should just protect
>>>>> against having scale ups immediately following. For that, I think we
>>>>> could just keep the current behavior of transitioning to Restarting
>>>>> state and then back to Waiting for Resources state. This state will
>>>>> protect us against scale ups immediately following failure/restart.
>>>>>
>>>>>
>>>>>> We should also think about how it relates to the externalized
>>>>>> declarative resource management. Should we always rescale
>>>>>> immediately? Should we wait until the cooldown is over?
>>>>>
>>>>> It relates to point 2, no ? we should rescale immediately only if
>>>>> last rescale was done more than scaling-interval.min ago otherwise
>>>>> schedule a rescale at last-rescale + scaling-interval.min time.
>>>>>
>>>>>
>>>>>> Related to this, there's the min-parallelism-increase option, that
>>>>>> if for example set to "2" restricts rescale operations to only occur
>>>>>> if the parallelism increases by at least 2.
>>>>>
>>>>> yes I saw that in the code
>>>>>
>>>>>
>>>>>> Ideally however there would be a max timeout for this.
>>>>>>
>>>>>> As such we could maybe think about this a bit differently:
>>>>>> Add 2 new options instead of 1:
>>>>>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
>>>>>> the scheduler will wait for the next effective rescale operations.
>>>>>
>>>>> At point 2, we said that when slots change (requirements change or
>>>>> new slots available), if last rescale check (call to maybeRescale)
>>>>> was done less than scaling-interval.min ago, we should schedule a
>>>>> check that will rescale if min-parallelism-increase is met. Then,
>>>>> what it the use of scaling-interval.max timeout in that context ?
>>>>>
>>>>>
>>>>>> 3) It sounds fine that we lose the cooldown state, because imo we
>>>>>> want to reset the cooldown anyway on job failures (because a job
>>>>>> failure inherently implies a potential rescaling).
>>>>>
>>>>> exactly.
>>>>>
>>>>>
>>>>>> 4) The stabilization time isn't really redundant and serves a
>>>>>> different use-case. The idea behind is that if a users adds multiple
>>>>>> TMs at once then we don't want to rescale immediately at the first
>>>>>> received slot. Without the stabilization time the cooldown would
>>>>>> actually cause bad behavior here, because not only would we rescale
>>>>>> immediately upon receiving the minimum required slots to scale up,
>>>>>> but we also wouldn't use the remaining slots just because the
>>>>>> cooldown says so.
>>>>>
>>>>> I meant the opposite: not having only the cooldown but having only
>>>>> the stabilization time. I must have missed something because what I
>>>>> wonder is: if every rescale entails a restart of the pipeline and
>>>>> every restart entails passing in waiting for resources state, then
>>>>> why introduce a cooldown when there is already at each rescale a
>>>>> stable resource timeout ?
>>>>>
>>>>>
>>>>> Best
>>>>>
>>>>> Etienne
>>>>>
>>>>>
>>>>>
>>>>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>>>>>> Hi Robert,
>>>>>>>
>>>>>>> Thanks for your feedback. I don't know the scheduler part well
>>>>>>> enough yet and I'm taking this ticket as a learning workshop.
>>>>>>>
>>>>>>> Regarding your comments:
>>>>>>>
>>>>>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
>>>>>>> configuration from the JobManagerOptions, and also to be consistent
>>>>>>> with other parameters name, I'd suggest
>>>>>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>>>>>
>>>>>>> 2. I thought scaling events existed already and the scheduler
>>>>>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
>>>>>>> is in the Executing state and receives new slots") or in FLIP-138
>>>>>>> (cf "Whenever new slots are available the SlotPool notifies the
>>>>>>> Scheduler"). If it is not the case (it is the scheduler who asks
>>>>>>> for slots), then there is no need for storing scaling requests
>>> indeed.
>>>>>>> => I need a confirmation here
>>>>>>>
>>>>>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
>>>>>>> state and the CoolDownTimer state. So, upon recovery, it would be
>>>>>>> as if there was no ongoing coolDown period. So, a first re-scale
>>>>>>> could happen right away and it will start a coolDown period. A
>>>>>>> second re-scale would have to wait for the end of this period.
>>>>>>>
>>>>>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
>>>>>>> AdaptiveScheduler passes again in the "waiting for resources" state
>>>>>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
>>>>>>> is kind of redundant with the resource-stabilization-timeout. I
>>>>>>> guess it is not the case otherwise the FLINK-21883 ticket would not
>>>>>>> have been created.
>>>>>>>
>>>>>>> => I need a confirmation here also.
>>>>>>>
>>>>>>>
>>>>>>> Thanks for your views on point 2 and 4.
>>>>>>>
>>>>>>>
>>>>>>> Best
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>>>>>> Thanks for the FLIP.
>>>>>>>>
>>>>>>>> Some comments:
>>>>>>>> 1. Can you specify the full proposed configuration name? "
>>>>>>>> scaling-cooldown-period" is probably not the full config name?
>>>>>>>> 2. Why is the concept of scaling events and a scaling queue
>>>>>>>> needed? If I
>>>>>>>> remember correctly, the adaptive scheduler will just check how many
>>>>>>>> TaskManagers are available and then adjust the execution graph
>>>>>>>> accordingly.
>>>>>>>> There's no need to store a number of scaling events. We just need to
>>>>>>>> determine the time to trigger an adjustment of the execution graph.
>>>>>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
>>>>>>>> the state
>>>>>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>>>>>>>> cooldown period, so after recovery of a JobManager, we have to
>>>>>>>> wait at
>>>>>>>> least for the cooldown period until further scaling operations are
>>>>>>>> done.
>>>>>>>> 4. What's the relationship to the
>>>>>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>>>>>> configuration?
>>>>>>>>
>>>>>>>> Thanks a lot for working on this!
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Robert
>>>>>>>>
>>>>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>>>>>>>> Chauchot<ec...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>>>>>>>>> operations that we discussed below PTAL.
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>>
>>>>>>>>> Etienne
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>>>>>> Hi Yuxia,
>>>>>>>>>>
>>>>>>>>>> Thanks for your feedback. The number of potentially stacked
>>>>>>>>>> operations
>>>>>>>>>> depends on the configured length of the cooldown period.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The proposition in the FLIP is to add a minimum delay between 2
>>>>>>>>>> scaling
>>>>>>>>>> operations. But, indeed, an optimization could be to still stack
>>>>>>>>>> the
>>>>>>>>>> operations (that arrive during a cooldown period) but maybe not
>>>>>>>>>> take
>>>>>>>>>> only the last operation but rather aggregate them in order to
>>>>>>>>>> end up
>>>>>>>>>> with a single aggregated operation when the cooldown period
>>>>>>>>>> ends. For
>>>>>>>>>> example, let's say 3 taskManagers come up and 1 comes down
>>>>>>>>>> during the
>>>>>>>>>> cooldown period, we could generate a single operation of scale
>>>>>>>>>> up +2
>>>>>>>>>> when the period ends.
>>>>>>>>>>
>>>>>>>>>> As a side note regarding your comment on "it'll take a long time
>>> to
>>>>>>>>>> finish all", please keep in mind that the reactive mode (at
>>>>>>>>>> least for
>>>>>>>>>> now) is only available for streaming pipeline which are in essence
>>>>>>>>>> infinite processing.
>>>>>>>>>>
>>>>>>>>>> Another side note: when you mention "every taskManagers
>>>>>>>>>> connecting",
>>>>>>>>>> if you are referring to the start of the pipeline, please keep
>>>>>>>>>> in mind
>>>>>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>>>>>>>>>> period before starting the pipeline in which all taskmanagers
>>>>>>>>>> connect
>>>>>>>>>> and the parallelism is decided.
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>>
>>>>>>>>>> Etienne
>>>>>>>>>>
>>>>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>>>>>>>>> mechanism of the cooldown timeout.
>>>>>>>>>>>
>>>>>>>>>>>   From the Proposed Changes part, if a scalling event is
>>>>>>>>>>> received and
>>>>>>>>>>> it falls during the cooldown period, it'll be stacked to be
>>>>>>>>>>> executed
>>>>>>>>>>> after the period ends. Also, from the description of
>>>>>>>>>>> FLINK-21883[1],
>>>>>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>>>>>>
>>>>>>>>>>> So, is it possible that every taskmanager connecting will
>>>>>>>>>>> produce a
>>>>>>>>>>> scalling event and it'll be stacked with many scale up event
>>> which
>>>>>>>>>>> causes it'll take a long time to finish all? Can we just take the
>>>>>>>>>>> last one event?
>>>>>>>>>>>
>>>>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>>>>>
>>>>>>>>>>> Best regards, Yuxia
>>>>>>>>>>>
>>>>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>>>>>>>>>>> Chauchot"<ec...@apache.org>
>>>>>>>>>>> 收件人:
>>>>>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<
>>> metrobert@gmail.com>
>>>>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
>>>>>>>>>>> FLIP-322
>>>>>>>>>>> Cooldown
>>>>>>>>>>> period for adaptive scheduler
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>>>>>>>>>>> introduces a
>>>>>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>>>>>
>>>>>>>>>>> I'd like to get your feedback especially @Robert as you opened
>>> the
>>>>>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>>>>>>>>> Best
>>>>>>>>>>> Etienne
>>>>>>
>>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by David Morávek <dm...@apache.org>.
> They will struggle if they add new resources and nothing happens for 5
minutes.

The same applies if they start playing with FLIP-291 APIs. I'm wondering if
the cooldown makes sense there since it was the user's deliberate choice to
push new requirements. 🤔

Best,
D.

On Tue, Jul 4, 2023 at 9:11 AM David Morávek <dm...@apache.org> wrote:

> The FLIP reads sane to me. I'm unsure about the default values, though; 5
> minutes of wait time between rescales feels rather strict, and we should
> rethink it to provide a better out-of-the-box experience.
>
> I'd focus on newcomers trying AS / Reactive Mode out. They will struggle
> if they add new resources and nothing happens for 5 minutes. I'd suggest
> defaulting to
> *jobmanager.adaptive-scheduler.resource-stabilization-timeout* (which
> defaults to 10s).
>
> I'm still struggling to grasp max internal (force rescale). Ignoring `AdaptiveScheduler#shouldRescale()`
> condition seems rather dangerous. Wouldn't a simple case where you add a
> new TM and remove it before the max interval is reached (so there is
> nothing to do) result in an unnecessary job restart?
>
> Best,
> D.
>
> On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot <ec...@apache.org>
> wrote:
>
>> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
>> vote thread.
>>
>> Best
>>
>> Etienne
>>
>> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
>> > > we should schedule a check that will rescale if
>> > min-parallelism-increase is met. Then, what it the use of
>> > scaling-interval.max timeout in that context ?
>> >
>> > To force a rescale if min-parallelism-increase is not met (but we
>> > could still run above the current parallelism).
>> >
>> > min-parallelism-increase is a trade-off between the cost of rescaling
>> > vs the performance benefit of the parallelism increase. Over time the
>> > balance tips more and more in favor of the parallelism increase, hence
>> > we should eventually rescale anyway even if the minimum isn't met, or
>> > at least give users the option to do so.
>> >
>> > > I meant the opposite: not having only the cooldown but having only
>> > the stabilization time. I must have missed something because what I
>> > wonder is: if every rescale entails a restart of the pipeline and
>> > every restart entails passing in waiting for resources state, then why
>> > introduce a cooldown when there is already at each rescale a stable
>> > resource timeout ?
>> >
>> > It is technically correct that the stable resource timeout can be used
>> > to limit the number of rescale operations per interval, however during
>> > that time the job isn't running, in contrast to the cooldown.
>> >
>> > Having both just gives you a lot more flexibility.
>> > "I want at most 1 rescale operation per hour, and wait at most 1
>> > minute for resource to stabilize when a rescale happens".
>> > You can't express this with only one of the options.
>> >
>> > On 20/06/2023 14:41, Etienne Chauchot wrote:
>> >> Hi Chesnay,
>> >>
>> >> Thanks for your feedback. Comments inline
>> >>
>> >> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>> >>> 1) Options specific to the adaptive scheduler should start with
>> >>> "jobmanager.adaptive-scheduler".
>> >>
>> >>
>> >> ok
>> >>
>> >>
>> >>> 2)
>> >>> There isn't /really /a notion of a "scaling event". The scheduler is
>> >>> informed about new/lost slots and job failures, and reacts
>> >>> accordingly by maybe rescaling the job.
>> >>> (sure, you can think of these as events, but you can think of
>> >>> practically everything as events)
>> >>>
>> >>> There shouldn't be a queue for events. All the scheduler should have
>> >>> to know is that the next rescale check is scheduled for time T,
>> >>> which in practice boils down to a flag and a scheduled action that
>> >>> runs Executing#maybeRescale.
>> >>
>> >>
>> >> Makes total sense, its very simple like this. Thanks for the
>> >> precision and pointer. After the related FLIPs, I'll look at the code
>> >> now.
>> >>
>> >>
>> >>> With that in mind, we also have to look at how we keep this state
>> >>> around. Presumably it is scoped to the current state, such that the
>> >>> cooldown is reset if a job fails.
>> >>> Maybe we should add a separate ExecutingWithCooldown state; not sure
>> >>> yet.
>> >>
>> >>
>> >> Yes loosing cooldown state and cooldown reset upon failure is what I
>> >> suggested in point 3 in previous email. Not sure either for a new
>> >> state, I'll figure it out after experimenting with the code. I'll
>> >> update the FLIP then.
>> >>
>> >>
>> >>>
>> >>> It would be good to clarify whether this FLIP only attempts to cover
>> >>> scale up operations, or also scale downs in case of slot losses.
>> >>
>> >>
>> >> When there are slots loss, most of the time it is due to a TM loss so
>> >> there should be several slots lost at the same time but (hopefully)
>> >> only once. There should not be many scale downs in a row (but still
>> >> cascading failures can happen). I think, we should just protect
>> >> against having scale ups immediately following. For that, I think we
>> >> could just keep the current behavior of transitioning to Restarting
>> >> state and then back to Waiting for Resources state. This state will
>> >> protect us against scale ups immediately following failure/restart.
>> >>
>> >>
>> >>>
>> >>> We should also think about how it relates to the externalized
>> >>> declarative resource management. Should we always rescale
>> >>> immediately? Should we wait until the cooldown is over?
>> >>
>> >>
>> >> It relates to point 2, no ? we should rescale immediately only if
>> >> last rescale was done more than scaling-interval.min ago otherwise
>> >> schedule a rescale at last-rescale + scaling-interval.min time.
>> >>
>> >>
>> >>> Related to this, there's the min-parallelism-increase option, that
>> >>> if for example set to "2" restricts rescale operations to only occur
>> >>> if the parallelism increases by at least 2.
>> >>
>> >>
>> >> yes I saw that in the code
>> >>
>> >>
>> >>> Ideally however there would be a max timeout for this.
>> >>>
>> >>> As such we could maybe think about this a bit differently:
>> >>> Add 2 new options instead of 1:
>> >>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
>> >>> the scheduler will wait for the next effective rescale operations.
>> >>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
>> >>> the scheduler will wait for the next effective rescale operations.
>> >>
>> >>
>> >> At point 2, we said that when slots change (requirements change or
>> >> new slots available), if last rescale check (call to maybeRescale)
>> >> was done less than scaling-interval.min ago, we should schedule a
>> >> check that will rescale if min-parallelism-increase is met. Then,
>> >> what it the use of scaling-interval.max timeout in that context ?
>> >>
>> >>
>> >>>
>> >>> 3) It sounds fine that we lose the cooldown state, because imo we
>> >>> want to reset the cooldown anyway on job failures (because a job
>> >>> failure inherently implies a potential rescaling).
>> >>
>> >>
>> >> exactly.
>> >>
>> >>
>> >>>
>> >>> 4) The stabilization time isn't really redundant and serves a
>> >>> different use-case. The idea behind is that if a users adds multiple
>> >>> TMs at once then we don't want to rescale immediately at the first
>> >>> received slot. Without the stabilization time the cooldown would
>> >>> actually cause bad behavior here, because not only would we rescale
>> >>> immediately upon receiving the minimum required slots to scale up,
>> >>> but we also wouldn't use the remaining slots just because the
>> >>> cooldown says so.
>> >>
>> >>
>> >> I meant the opposite: not having only the cooldown but having only
>> >> the stabilization time. I must have missed something because what I
>> >> wonder is: if every rescale entails a restart of the pipeline and
>> >> every restart entails passing in waiting for resources state, then
>> >> why introduce a cooldown when there is already at each rescale a
>> >> stable resource timeout ?
>> >>
>> >>
>> >> Best
>> >>
>> >> Etienne
>> >>
>> >>
>> >>
>> >>>
>> >>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>> >>>> Hi Robert,
>> >>>>
>> >>>> Thanks for your feedback. I don't know the scheduler part well
>> >>>> enough yet and I'm taking this ticket as a learning workshop.
>> >>>>
>> >>>> Regarding your comments:
>> >>>>
>> >>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
>> >>>> configuration from the JobManagerOptions, and also to be consistent
>> >>>> with other parameters name, I'd suggest
>> >>>> /jobmanager.scheduler-scaling-cooldown-period/
>> >>>>
>> >>>> 2. I thought scaling events existed already and the scheduler
>> >>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
>> >>>> is in the Executing state and receives new slots") or in FLIP-138
>> >>>> (cf "Whenever new slots are available the SlotPool notifies the
>> >>>> Scheduler"). If it is not the case (it is the scheduler who asks
>> >>>> for slots), then there is no need for storing scaling requests
>> indeed.
>> >>>>
>> >>>> => I need a confirmation here
>> >>>>
>> >>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
>> >>>> state and the CoolDownTimer state. So, upon recovery, it would be
>> >>>> as if there was no ongoing coolDown period. So, a first re-scale
>> >>>> could happen right away and it will start a coolDown period. A
>> >>>> second re-scale would have to wait for the end of this period.
>> >>>>
>> >>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
>> >>>> AdaptiveScheduler passes again in the "waiting for resources" state
>> >>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
>> >>>> is kind of redundant with the resource-stabilization-timeout. I
>> >>>> guess it is not the case otherwise the FLINK-21883 ticket would not
>> >>>> have been created.
>> >>>>
>> >>>> => I need a confirmation here also.
>> >>>>
>> >>>>
>> >>>> Thanks for your views on point 2 and 4.
>> >>>>
>> >>>>
>> >>>> Best
>> >>>>
>> >>>> Etienne
>> >>>>
>> >>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>> >>>>> Thanks for the FLIP.
>> >>>>>
>> >>>>> Some comments:
>> >>>>> 1. Can you specify the full proposed configuration name? "
>> >>>>> scaling-cooldown-period" is probably not the full config name?
>> >>>>> 2. Why is the concept of scaling events and a scaling queue
>> >>>>> needed? If I
>> >>>>> remember correctly, the adaptive scheduler will just check how many
>> >>>>> TaskManagers are available and then adjust the execution graph
>> >>>>> accordingly.
>> >>>>> There's no need to store a number of scaling events. We just need to
>> >>>>> determine the time to trigger an adjustment of the execution graph.
>> >>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
>> >>>>> the state
>> >>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>> >>>>> cooldown period, so after recovery of a JobManager, we have to
>> >>>>> wait at
>> >>>>> least for the cooldown period until further scaling operations are
>> >>>>> done.
>> >>>>> 4. What's the relationship to the
>> >>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>> >>>>> configuration?
>> >>>>>
>> >>>>> Thanks a lot for working on this!
>> >>>>>
>> >>>>> Best,
>> >>>>> Robert
>> >>>>>
>> >>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
>> >>>>> Chauchot<ec...@apache.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>> >>>>>> operations that we discussed below PTAL.
>> >>>>>>
>> >>>>>> Best
>> >>>>>>
>> >>>>>> Etienne
>> >>>>>>
>> >>>>>>
>> >>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>> >>>>>>> Hi Yuxia,
>> >>>>>>>
>> >>>>>>> Thanks for your feedback. The number of potentially stacked
>> >>>>>>> operations
>> >>>>>>> depends on the configured length of the cooldown period.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> The proposition in the FLIP is to add a minimum delay between 2
>> >>>>>>> scaling
>> >>>>>>> operations. But, indeed, an optimization could be to still stack
>> >>>>>>> the
>> >>>>>>> operations (that arrive during a cooldown period) but maybe not
>> >>>>>>> take
>> >>>>>>> only the last operation but rather aggregate them in order to
>> >>>>>>> end up
>> >>>>>>> with a single aggregated operation when the cooldown period
>> >>>>>>> ends. For
>> >>>>>>> example, let's say 3 taskManagers come up and 1 comes down
>> >>>>>>> during the
>> >>>>>>> cooldown period, we could generate a single operation of scale
>> >>>>>>> up +2
>> >>>>>>> when the period ends.
>> >>>>>>>
>> >>>>>>> As a side note regarding your comment on "it'll take a long time
>> to
>> >>>>>>> finish all", please keep in mind that the reactive mode (at
>> >>>>>>> least for
>> >>>>>>> now) is only available for streaming pipeline which are in essence
>> >>>>>>> infinite processing.
>> >>>>>>>
>> >>>>>>> Another side note: when you mention "every taskManagers
>> >>>>>>> connecting",
>> >>>>>>> if you are referring to the start of the pipeline, please keep
>> >>>>>>> in mind
>> >>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>> >>>>>>> period before starting the pipeline in which all taskmanagers
>> >>>>>>> connect
>> >>>>>>> and the parallelism is decided.
>> >>>>>>>
>> >>>>>>> Best
>> >>>>>>>
>> >>>>>>> Etienne
>> >>>>>>>
>> >>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>> >>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>> >>>>>>>> mechanism of the cooldown timeout.
>> >>>>>>>>
>> >>>>>>>>  From the Proposed Changes part, if a scalling event is
>> >>>>>>>> received and
>> >>>>>>>> it falls during the cooldown period, it'll be stacked to be
>> >>>>>>>> executed
>> >>>>>>>> after the period ends. Also, from the description of
>> >>>>>>>> FLINK-21883[1],
>> >>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>> >>>>>>>> because TaskManagers are not all connecting at the same time.
>> >>>>>>>>
>> >>>>>>>> So, is it possible that every taskmanager connecting will
>> >>>>>>>> produce a
>> >>>>>>>> scalling event and it'll be stacked with many scale up event
>> which
>> >>>>>>>> causes it'll take a long time to finish all? Can we just take the
>> >>>>>>>> last one event?
>> >>>>>>>>
>> >>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>> >>>>>>>>
>> >>>>>>>> Best regards, Yuxia
>> >>>>>>>>
>> >>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
>> >>>>>>>> Chauchot"<ec...@apache.org>
>> >>>>>>>> 收件人:
>> >>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<
>> metrobert@gmail.com>
>> >>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
>> >>>>>>>> FLIP-322
>> >>>>>>>> Cooldown
>> >>>>>>>> period for adaptive scheduler
>> >>>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
>> >>>>>>>> introduces a
>> >>>>>>>> cooldown period for the adaptive scheduler.
>> >>>>>>>>
>> >>>>>>>> I'd like to get your feedback especially @Robert as you opened
>> the
>> >>>>>>>> related ticket and worked on the reactive mode a lot.
>> >>>>>>>>
>> >>>>>>>> [1]
>> >>>>>>>>
>> >>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>> >>>>>>
>> >>>>>>> Best
>> >>>>>>>> Etienne
>> >>>
>> >>>
>> >
>
>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by David Morávek <dm...@apache.org>.
The FLIP reads sane to me. I'm unsure about the default values, though; 5
minutes of wait time between rescales feels rather strict, and we should
rethink it to provide a better out-of-the-box experience.

I'd focus on newcomers trying AS / Reactive Mode out. They will struggle if
they add new resources and nothing happens for 5 minutes. I'd suggest
defaulting to *jobmanager.adaptive-scheduler.resource-stabilization-timeout*
(which
defaults to 10s).

I'm still struggling to grasp max internal (force rescale). Ignoring
`AdaptiveScheduler#shouldRescale()`
condition seems rather dangerous. Wouldn't a simple case where you add a
new TM and remove it before the max interval is reached (so there is
nothing to do) result in an unnecessary job restart?

Best,
D.

On Thu, Jun 29, 2023 at 3:43 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a
> vote thread.
>
> Best
>
> Etienne
>
> Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
> > > we should schedule a check that will rescale if
> > min-parallelism-increase is met. Then, what it the use of
> > scaling-interval.max timeout in that context ?
> >
> > To force a rescale if min-parallelism-increase is not met (but we
> > could still run above the current parallelism).
> >
> > min-parallelism-increase is a trade-off between the cost of rescaling
> > vs the performance benefit of the parallelism increase. Over time the
> > balance tips more and more in favor of the parallelism increase, hence
> > we should eventually rescale anyway even if the minimum isn't met, or
> > at least give users the option to do so.
> >
> > > I meant the opposite: not having only the cooldown but having only
> > the stabilization time. I must have missed something because what I
> > wonder is: if every rescale entails a restart of the pipeline and
> > every restart entails passing in waiting for resources state, then why
> > introduce a cooldown when there is already at each rescale a stable
> > resource timeout ?
> >
> > It is technically correct that the stable resource timeout can be used
> > to limit the number of rescale operations per interval, however during
> > that time the job isn't running, in contrast to the cooldown.
> >
> > Having both just gives you a lot more flexibility.
> > "I want at most 1 rescale operation per hour, and wait at most 1
> > minute for resource to stabilize when a rescale happens".
> > You can't express this with only one of the options.
> >
> > On 20/06/2023 14:41, Etienne Chauchot wrote:
> >> Hi Chesnay,
> >>
> >> Thanks for your feedback. Comments inline
> >>
> >> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
> >>> 1) Options specific to the adaptive scheduler should start with
> >>> "jobmanager.adaptive-scheduler".
> >>
> >>
> >> ok
> >>
> >>
> >>> 2)
> >>> There isn't /really /a notion of a "scaling event". The scheduler is
> >>> informed about new/lost slots and job failures, and reacts
> >>> accordingly by maybe rescaling the job.
> >>> (sure, you can think of these as events, but you can think of
> >>> practically everything as events)
> >>>
> >>> There shouldn't be a queue for events. All the scheduler should have
> >>> to know is that the next rescale check is scheduled for time T,
> >>> which in practice boils down to a flag and a scheduled action that
> >>> runs Executing#maybeRescale.
> >>
> >>
> >> Makes total sense, its very simple like this. Thanks for the
> >> precision and pointer. After the related FLIPs, I'll look at the code
> >> now.
> >>
> >>
> >>> With that in mind, we also have to look at how we keep this state
> >>> around. Presumably it is scoped to the current state, such that the
> >>> cooldown is reset if a job fails.
> >>> Maybe we should add a separate ExecutingWithCooldown state; not sure
> >>> yet.
> >>
> >>
> >> Yes loosing cooldown state and cooldown reset upon failure is what I
> >> suggested in point 3 in previous email. Not sure either for a new
> >> state, I'll figure it out after experimenting with the code. I'll
> >> update the FLIP then.
> >>
> >>
> >>>
> >>> It would be good to clarify whether this FLIP only attempts to cover
> >>> scale up operations, or also scale downs in case of slot losses.
> >>
> >>
> >> When there are slots loss, most of the time it is due to a TM loss so
> >> there should be several slots lost at the same time but (hopefully)
> >> only once. There should not be many scale downs in a row (but still
> >> cascading failures can happen). I think, we should just protect
> >> against having scale ups immediately following. For that, I think we
> >> could just keep the current behavior of transitioning to Restarting
> >> state and then back to Waiting for Resources state. This state will
> >> protect us against scale ups immediately following failure/restart.
> >>
> >>
> >>>
> >>> We should also think about how it relates to the externalized
> >>> declarative resource management. Should we always rescale
> >>> immediately? Should we wait until the cooldown is over?
> >>
> >>
> >> It relates to point 2, no ? we should rescale immediately only if
> >> last rescale was done more than scaling-interval.min ago otherwise
> >> schedule a rescale at last-rescale + scaling-interval.min time.
> >>
> >>
> >>> Related to this, there's the min-parallelism-increase option, that
> >>> if for example set to "2" restricts rescale operations to only occur
> >>> if the parallelism increases by at least 2.
> >>
> >>
> >> yes I saw that in the code
> >>
> >>
> >>> Ideally however there would be a max timeout for this.
> >>>
> >>> As such we could maybe think about this a bit differently:
> >>> Add 2 new options instead of 1:
> >>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time
> >>> the scheduler will wait for the next effective rescale operations.
> >>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time
> >>> the scheduler will wait for the next effective rescale operations.
> >>
> >>
> >> At point 2, we said that when slots change (requirements change or
> >> new slots available), if last rescale check (call to maybeRescale)
> >> was done less than scaling-interval.min ago, we should schedule a
> >> check that will rescale if min-parallelism-increase is met. Then,
> >> what it the use of scaling-interval.max timeout in that context ?
> >>
> >>
> >>>
> >>> 3) It sounds fine that we lose the cooldown state, because imo we
> >>> want to reset the cooldown anyway on job failures (because a job
> >>> failure inherently implies a potential rescaling).
> >>
> >>
> >> exactly.
> >>
> >>
> >>>
> >>> 4) The stabilization time isn't really redundant and serves a
> >>> different use-case. The idea behind is that if a users adds multiple
> >>> TMs at once then we don't want to rescale immediately at the first
> >>> received slot. Without the stabilization time the cooldown would
> >>> actually cause bad behavior here, because not only would we rescale
> >>> immediately upon receiving the minimum required slots to scale up,
> >>> but we also wouldn't use the remaining slots just because the
> >>> cooldown says so.
> >>
> >>
> >> I meant the opposite: not having only the cooldown but having only
> >> the stabilization time. I must have missed something because what I
> >> wonder is: if every rescale entails a restart of the pipeline and
> >> every restart entails passing in waiting for resources state, then
> >> why introduce a cooldown when there is already at each rescale a
> >> stable resource timeout ?
> >>
> >>
> >> Best
> >>
> >> Etienne
> >>
> >>
> >>
> >>>
> >>> On 16/06/2023 15:47, Etienne Chauchot wrote:
> >>>> Hi Robert,
> >>>>
> >>>> Thanks for your feedback. I don't know the scheduler part well
> >>>> enough yet and I'm taking this ticket as a learning workshop.
> >>>>
> >>>> Regarding your comments:
> >>>>
> >>>> 1. Taking a look at the AdaptiveScheduler class which takes all its
> >>>> configuration from the JobManagerOptions, and also to be consistent
> >>>> with other parameters name, I'd suggest
> >>>> /jobmanager.scheduler-scaling-cooldown-period/
> >>>>
> >>>> 2. I thought scaling events existed already and the scheduler
> >>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler
> >>>> is in the Executing state and receives new slots") or in FLIP-138
> >>>> (cf "Whenever new slots are available the SlotPool notifies the
> >>>> Scheduler"). If it is not the case (it is the scheduler who asks
> >>>> for slots), then there is no need for storing scaling requests indeed.
> >>>>
> >>>> => I need a confirmation here
> >>>>
> >>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler
> >>>> state and the CoolDownTimer state. So, upon recovery, it would be
> >>>> as if there was no ongoing coolDown period. So, a first re-scale
> >>>> could happen right away and it will start a coolDown period. A
> >>>> second re-scale would have to wait for the end of this period.
> >>>>
> >>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the
> >>>> AdaptiveScheduler passes again in the "waiting for resources" state
> >>>> as FLIP-160 suggests. If so, then it seems that the coolDown period
> >>>> is kind of redundant with the resource-stabilization-timeout. I
> >>>> guess it is not the case otherwise the FLINK-21883 ticket would not
> >>>> have been created.
> >>>>
> >>>> => I need a confirmation here also.
> >>>>
> >>>>
> >>>> Thanks for your views on point 2 and 4.
> >>>>
> >>>>
> >>>> Best
> >>>>
> >>>> Etienne
> >>>>
> >>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
> >>>>> Thanks for the FLIP.
> >>>>>
> >>>>> Some comments:
> >>>>> 1. Can you specify the full proposed configuration name? "
> >>>>> scaling-cooldown-period" is probably not the full config name?
> >>>>> 2. Why is the concept of scaling events and a scaling queue
> >>>>> needed? If I
> >>>>> remember correctly, the adaptive scheduler will just check how many
> >>>>> TaskManagers are available and then adjust the execution graph
> >>>>> accordingly.
> >>>>> There's no need to store a number of scaling events. We just need to
> >>>>> determine the time to trigger an adjustment of the execution graph.
> >>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose
> >>>>> the state
> >>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
> >>>>> cooldown period, so after recovery of a JobManager, we have to
> >>>>> wait at
> >>>>> least for the cooldown period until further scaling operations are
> >>>>> done.
> >>>>> 4. What's the relationship to the
> >>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
> >>>>> configuration?
> >>>>>
> >>>>> Thanks a lot for working on this!
> >>>>>
> >>>>> Best,
> >>>>> Robert
> >>>>>
> >>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne
> >>>>> Chauchot<ec...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
> >>>>>> operations that we discussed below PTAL.
> >>>>>>
> >>>>>> Best
> >>>>>>
> >>>>>> Etienne
> >>>>>>
> >>>>>>
> >>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
> >>>>>>> Hi Yuxia,
> >>>>>>>
> >>>>>>> Thanks for your feedback. The number of potentially stacked
> >>>>>>> operations
> >>>>>>> depends on the configured length of the cooldown period.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> The proposition in the FLIP is to add a minimum delay between 2
> >>>>>>> scaling
> >>>>>>> operations. But, indeed, an optimization could be to still stack
> >>>>>>> the
> >>>>>>> operations (that arrive during a cooldown period) but maybe not
> >>>>>>> take
> >>>>>>> only the last operation but rather aggregate them in order to
> >>>>>>> end up
> >>>>>>> with a single aggregated operation when the cooldown period
> >>>>>>> ends. For
> >>>>>>> example, let's say 3 taskManagers come up and 1 comes down
> >>>>>>> during the
> >>>>>>> cooldown period, we could generate a single operation of scale
> >>>>>>> up +2
> >>>>>>> when the period ends.
> >>>>>>>
> >>>>>>> As a side note regarding your comment on "it'll take a long time to
> >>>>>>> finish all", please keep in mind that the reactive mode (at
> >>>>>>> least for
> >>>>>>> now) is only available for streaming pipeline which are in essence
> >>>>>>> infinite processing.
> >>>>>>>
> >>>>>>> Another side note: when you mention "every taskManagers
> >>>>>>> connecting",
> >>>>>>> if you are referring to the start of the pipeline, please keep
> >>>>>>> in mind
> >>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
> >>>>>>> period before starting the pipeline in which all taskmanagers
> >>>>>>> connect
> >>>>>>> and the parallelism is decided.
> >>>>>>>
> >>>>>>> Best
> >>>>>>>
> >>>>>>> Etienne
> >>>>>>>
> >>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
> >>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
> >>>>>>>> mechanism of the cooldown timeout.
> >>>>>>>>
> >>>>>>>>  From the Proposed Changes part, if a scalling event is
> >>>>>>>> received and
> >>>>>>>> it falls during the cooldown period, it'll be stacked to be
> >>>>>>>> executed
> >>>>>>>> after the period ends. Also, from the description of
> >>>>>>>> FLINK-21883[1],
> >>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
> >>>>>>>> because TaskManagers are not all connecting at the same time.
> >>>>>>>>
> >>>>>>>> So, is it possible that every taskmanager connecting will
> >>>>>>>> produce a
> >>>>>>>> scalling event and it'll be stacked with many scale up event which
> >>>>>>>> causes it'll take a long time to finish all? Can we just take the
> >>>>>>>> last one event?
> >>>>>>>>
> >>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
> >>>>>>>>
> >>>>>>>> Best regards, Yuxia
> >>>>>>>>
> >>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne
> >>>>>>>> Chauchot"<ec...@apache.org>
> >>>>>>>> 收件人:
> >>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<metrobert@gmail.com
> >
> >>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS]
> >>>>>>>> FLIP-322
> >>>>>>>> Cooldown
> >>>>>>>> period for adaptive scheduler
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> I’d like to start a discussion about FLIP-322 [1] which
> >>>>>>>> introduces a
> >>>>>>>> cooldown period for the adaptive scheduler.
> >>>>>>>>
> >>>>>>>> I'd like to get your feedback especially @Robert as you opened the
> >>>>>>>> related ticket and worked on the reactive mode a lot.
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
> >>>>>>
> >>>>>>> Best
> >>>>>>>> Etienne
> >>>
> >>>
> >

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Thanks Chesnay for your feedback. I have updated the FLIP. I'll start a 
vote thread.

Best

Etienne

Le 28/06/2023 à 11:49, Chesnay Schepler a écrit :
> > we should schedule a check that will rescale if 
> min-parallelism-increase is met. Then, what it the use of 
> scaling-interval.max timeout in that context ?
>
> To force a rescale if min-parallelism-increase is not met (but we 
> could still run above the current parallelism).
>
> min-parallelism-increase is a trade-off between the cost of rescaling 
> vs the performance benefit of the parallelism increase. Over time the 
> balance tips more and more in favor of the parallelism increase, hence 
> we should eventually rescale anyway even if the minimum isn't met, or 
> at least give users the option to do so.
>
> > I meant the opposite: not having only the cooldown but having only 
> the stabilization time. I must have missed something because what I 
> wonder is: if every rescale entails a restart of the pipeline and 
> every restart entails passing in waiting for resources state, then why 
> introduce a cooldown when there is already at each rescale a stable 
> resource timeout ?
>
> It is technically correct that the stable resource timeout can be used 
> to limit the number of rescale operations per interval, however during 
> that time the job isn't running, in contrast to the cooldown.
>
> Having both just gives you a lot more flexibility.
> "I want at most 1 rescale operation per hour, and wait at most 1 
> minute for resource to stabilize when a rescale happens".
> You can't express this with only one of the options.
>
> On 20/06/2023 14:41, Etienne Chauchot wrote:
>> Hi Chesnay,
>>
>> Thanks for your feedback. Comments inline
>>
>> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>>> 1) Options specific to the adaptive scheduler should start with 
>>> "jobmanager.adaptive-scheduler".
>>
>>
>> ok
>>
>>
>>> 2)
>>> There isn't /really /a notion of a "scaling event". The scheduler is 
>>> informed about new/lost slots and job failures, and reacts 
>>> accordingly by maybe rescaling the job.
>>> (sure, you can think of these as events, but you can think of 
>>> practically everything as events)
>>>
>>> There shouldn't be a queue for events. All the scheduler should have 
>>> to know is that the next rescale check is scheduled for time T, 
>>> which in practice boils down to a flag and a scheduled action that 
>>> runs Executing#maybeRescale.
>>
>>
>> Makes total sense, its very simple like this. Thanks for the 
>> precision and pointer. After the related FLIPs, I'll look at the code 
>> now.
>>
>>
>>> With that in mind, we also have to look at how we keep this state 
>>> around. Presumably it is scoped to the current state, such that the 
>>> cooldown is reset if a job fails.
>>> Maybe we should add a separate ExecutingWithCooldown state; not sure 
>>> yet.
>>
>>
>> Yes loosing cooldown state and cooldown reset upon failure is what I 
>> suggested in point 3 in previous email. Not sure either for a new 
>> state, I'll figure it out after experimenting with the code. I'll 
>> update the FLIP then.
>>
>>
>>>
>>> It would be good to clarify whether this FLIP only attempts to cover 
>>> scale up operations, or also scale downs in case of slot losses.
>>
>>
>> When there are slots loss, most of the time it is due to a TM loss so 
>> there should be several slots lost at the same time but (hopefully) 
>> only once. There should not be many scale downs in a row (but still 
>> cascading failures can happen). I think, we should just protect 
>> against having scale ups immediately following. For that, I think we 
>> could just keep the current behavior of transitioning to Restarting 
>> state and then back to Waiting for Resources state. This state will 
>> protect us against scale ups immediately following failure/restart.
>>
>>
>>>
>>> We should also think about how it relates to the externalized 
>>> declarative resource management. Should we always rescale 
>>> immediately? Should we wait until the cooldown is over?
>>
>>
>> It relates to point 2, no ? we should rescale immediately only if 
>> last rescale was done more than scaling-interval.min ago otherwise 
>> schedule a rescale at last-rescale + scaling-interval.min time.
>>
>>
>>> Related to this, there's the min-parallelism-increase option, that 
>>> if for example set to "2" restricts rescale operations to only occur 
>>> if the parallelism increases by at least 2.
>>
>>
>> yes I saw that in the code
>>
>>
>>> Ideally however there would be a max timeout for this.
>>>
>>> As such we could maybe think about this a bit differently:
>>> Add 2 new options instead of 1:
>>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time 
>>> the scheduler will wait for the next effective rescale operations.
>>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time 
>>> the scheduler will wait for the next effective rescale operations.
>>
>>
>> At point 2, we said that when slots change (requirements change or 
>> new slots available), if last rescale check (call to maybeRescale) 
>> was done less than scaling-interval.min ago, we should schedule a 
>> check that will rescale if min-parallelism-increase is met. Then, 
>> what it the use of scaling-interval.max timeout in that context ?
>>
>>
>>>
>>> 3) It sounds fine that we lose the cooldown state, because imo we 
>>> want to reset the cooldown anyway on job failures (because a job 
>>> failure inherently implies a potential rescaling).
>>
>>
>> exactly.
>>
>>
>>>
>>> 4) The stabilization time isn't really redundant and serves a 
>>> different use-case. The idea behind is that if a users adds multiple 
>>> TMs at once then we don't want to rescale immediately at the first 
>>> received slot. Without the stabilization time the cooldown would 
>>> actually cause bad behavior here, because not only would we rescale 
>>> immediately upon receiving the minimum required slots to scale up, 
>>> but we also wouldn't use the remaining slots just because the 
>>> cooldown says so.
>>
>>
>> I meant the opposite: not having only the cooldown but having only 
>> the stabilization time. I must have missed something because what I 
>> wonder is: if every rescale entails a restart of the pipeline and 
>> every restart entails passing in waiting for resources state, then 
>> why introduce a cooldown when there is already at each rescale a 
>> stable resource timeout ?
>>
>>
>> Best
>>
>> Etienne
>>
>>
>>
>>>
>>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>>> Hi Robert,
>>>>
>>>> Thanks for your feedback. I don't know the scheduler part well 
>>>> enough yet and I'm taking this ticket as a learning workshop.
>>>>
>>>> Regarding your comments:
>>>>
>>>> 1. Taking a look at the AdaptiveScheduler class which takes all its 
>>>> configuration from the JobManagerOptions, and also to be consistent 
>>>> with other parameters name, I'd suggest 
>>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>>
>>>> 2. I thought scaling events existed already and the scheduler 
>>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler 
>>>> is in the Executing state and receives new slots") or in FLIP-138 
>>>> (cf "Whenever new slots are available the SlotPool notifies the 
>>>> Scheduler"). If it is not the case (it is the scheduler who asks 
>>>> for slots), then there is no need for storing scaling requests indeed.
>>>>
>>>> => I need a confirmation here
>>>>
>>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler 
>>>> state and the CoolDownTimer state. So, upon recovery, it would be 
>>>> as if there was no ongoing coolDown period. So, a first re-scale 
>>>> could happen right away and it will start a coolDown period. A 
>>>> second re-scale would have to wait for the end of this period.
>>>>
>>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the 
>>>> AdaptiveScheduler passes again in the "waiting for resources" state 
>>>> as FLIP-160 suggests. If so, then it seems that the coolDown period 
>>>> is kind of redundant with the resource-stabilization-timeout. I 
>>>> guess it is not the case otherwise the FLINK-21883 ticket would not 
>>>> have been created.
>>>>
>>>> => I need a confirmation here also.
>>>>
>>>>
>>>> Thanks for your views on point 2 and 4.
>>>>
>>>>
>>>> Best
>>>>
>>>> Etienne
>>>>
>>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>>> Thanks for the FLIP.
>>>>>
>>>>> Some comments:
>>>>> 1. Can you specify the full proposed configuration name? "
>>>>> scaling-cooldown-period" is probably not the full config name?
>>>>> 2. Why is the concept of scaling events and a scaling queue 
>>>>> needed? If I
>>>>> remember correctly, the adaptive scheduler will just check how many
>>>>> TaskManagers are available and then adjust the execution graph 
>>>>> accordingly.
>>>>> There's no need to store a number of scaling events. We just need to
>>>>> determine the time to trigger an adjustment of the execution graph.
>>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose 
>>>>> the state
>>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>>>>> cooldown period, so after recovery of a JobManager, we have to 
>>>>> wait at
>>>>> least for the cooldown period until further scaling operations are 
>>>>> done.
>>>>> 4. What's the relationship to the
>>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>>> configuration?
>>>>>
>>>>> Thanks a lot for working on this!
>>>>>
>>>>> Best,
>>>>> Robert
>>>>>
>>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne 
>>>>> Chauchot<ec...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>>>>>> operations that we discussed below PTAL.
>>>>>>
>>>>>> Best
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>>
>>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>>> Hi Yuxia,
>>>>>>>
>>>>>>> Thanks for your feedback. The number of potentially stacked 
>>>>>>> operations
>>>>>>> depends on the configured length of the cooldown period.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The proposition in the FLIP is to add a minimum delay between 2 
>>>>>>> scaling
>>>>>>> operations. But, indeed, an optimization could be to still stack 
>>>>>>> the
>>>>>>> operations (that arrive during a cooldown period) but maybe not 
>>>>>>> take
>>>>>>> only the last operation but rather aggregate them in order to 
>>>>>>> end up
>>>>>>> with a single aggregated operation when the cooldown period 
>>>>>>> ends. For
>>>>>>> example, let's say 3 taskManagers come up and 1 comes down 
>>>>>>> during the
>>>>>>> cooldown period, we could generate a single operation of scale 
>>>>>>> up +2
>>>>>>> when the period ends.
>>>>>>>
>>>>>>> As a side note regarding your comment on "it'll take a long time to
>>>>>>> finish all", please keep in mind that the reactive mode (at 
>>>>>>> least for
>>>>>>> now) is only available for streaming pipeline which are in essence
>>>>>>> infinite processing.
>>>>>>>
>>>>>>> Another side note: when you mention "every taskManagers 
>>>>>>> connecting",
>>>>>>> if you are referring to the start of the pipeline, please keep 
>>>>>>> in mind
>>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>>>>>>> period before starting the pipeline in which all taskmanagers 
>>>>>>> connect
>>>>>>> and the parallelism is decided.
>>>>>>>
>>>>>>> Best
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>>>>>> mechanism of the cooldown timeout.
>>>>>>>>
>>>>>>>>  From the Proposed Changes part, if a scalling event is 
>>>>>>>> received and
>>>>>>>> it falls during the cooldown period, it'll be stacked to be 
>>>>>>>> executed
>>>>>>>> after the period ends. Also, from the description of 
>>>>>>>> FLINK-21883[1],
>>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>>>
>>>>>>>> So, is it possible that every taskmanager connecting will 
>>>>>>>> produce a
>>>>>>>> scalling event and it'll be stacked with many scale up event which
>>>>>>>> causes it'll take a long time to finish all? Can we just take the
>>>>>>>> last one event?
>>>>>>>>
>>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>>
>>>>>>>> Best regards, Yuxia
>>>>>>>>
>>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne 
>>>>>>>> Chauchot"<ec...@apache.org>
>>>>>>>> 收件人:
>>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<me...@gmail.com>
>>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] 
>>>>>>>> FLIP-322
>>>>>>>> Cooldown
>>>>>>>> period for adaptive scheduler
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I’d like to start a discussion about FLIP-322 [1] which 
>>>>>>>> introduces a
>>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>>
>>>>>>>> I'd like to get your feedback especially @Robert as you opened the
>>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler 
>>>>>>
>>>>>>> Best
>>>>>>>> Etienne
>>>
>>>
>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Chesnay Schepler <ch...@apache.org>.
 > we should schedule a check that will rescale if 
min-parallelism-increase is met. Then, what it the use of 
scaling-interval.max timeout in that context ?

To force a rescale if min-parallelism-increase is not met (but we could 
still run above the current parallelism).

min-parallelism-increase is a trade-off between the cost of rescaling vs 
the performance benefit of the parallelism increase. Over time the 
balance tips more and more in favor of the parallelism increase, hence 
we should eventually rescale anyway even if the minimum isn't met, or at 
least give users the option to do so.

 > I meant the opposite: not having only the cooldown but having only 
the stabilization time. I must have missed something because what I 
wonder is: if every rescale entails a restart of the pipeline and every 
restart entails passing in waiting for resources state, then why 
introduce a cooldown when there is already at each rescale a stable 
resource timeout ?

It is technically correct that the stable resource timeout can be used 
to limit the number of rescale operations per interval, however during 
that time the job isn't running, in contrast to the cooldown.

Having both just gives you a lot more flexibility.
"I want at most 1 rescale operation per hour, and wait at most 1 minute 
for resource to stabilize when a rescale happens".
You can't express this with only one of the options.

On 20/06/2023 14:41, Etienne Chauchot wrote:
> Hi Chesnay,
>
> Thanks for your feedback. Comments inline
>
> Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
>> 1) Options specific to the adaptive scheduler should start with 
>> "jobmanager.adaptive-scheduler".
>
>
> ok
>
>
>> 2)
>> There isn't /really /a notion of a "scaling event". The scheduler is 
>> informed about new/lost slots and job failures, and reacts 
>> accordingly by maybe rescaling the job.
>> (sure, you can think of these as events, but you can think of 
>> practically everything as events)
>>
>> There shouldn't be a queue for events. All the scheduler should have 
>> to know is that the next rescale check is scheduled for time T, which 
>> in practice boils down to a flag and a scheduled action that runs 
>> Executing#maybeRescale.
>
>
> Makes total sense, its very simple like this. Thanks for the precision 
> and pointer. After the related FLIPs, I'll look at the code now.
>
>
>> With that in mind, we also have to look at how we keep this state 
>> around. Presumably it is scoped to the current state, such that the 
>> cooldown is reset if a job fails.
>> Maybe we should add a separate ExecutingWithCooldown state; not sure 
>> yet.
>
>
> Yes loosing cooldown state and cooldown reset upon failure is what I 
> suggested in point 3 in previous email. Not sure either for a new 
> state, I'll figure it out after experimenting with the code. I'll 
> update the FLIP then.
>
>
>>
>> It would be good to clarify whether this FLIP only attempts to cover 
>> scale up operations, or also scale downs in case of slot losses.
>
>
> When there are slots loss, most of the time it is due to a TM loss so 
> there should be several slots lost at the same time but (hopefully) 
> only once. There should not be many scale downs in a row (but still 
> cascading failures can happen). I think, we should just protect 
> against having scale ups immediately following. For that, I think we 
> could just keep the current behavior of transitioning to Restarting 
> state and then back to Waiting for Resources state. This state will 
> protect us against scale ups immediately following failure/restart.
>
>
>>
>> We should also think about how it relates to the externalized 
>> declarative resource management. Should we always rescale 
>> immediately? Should we wait until the cooldown is over?
>
>
> It relates to point 2, no ? we should rescale immediately only if last 
> rescale was done more than scaling-interval.min ago otherwise schedule 
> a rescale at last-rescale + scaling-interval.min time.
>
>
>> Related to this, there's the min-parallelism-increase option, that if 
>> for example set to "2" restricts rescale operations to only occur if 
>> the parallelism increases by at least 2.
>
>
> yes I saw that in the code
>
>
>> Ideally however there would be a max timeout for this.
>>
>> As such we could maybe think about this a bit differently:
>> Add 2 new options instead of 1:
>> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time 
>> the scheduler will wait for the next effective rescale operations.
>> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time 
>> the scheduler will wait for the next effective rescale operations.
>
>
> At point 2, we said that when slots change (requirements change or new 
> slots available), if last rescale check (call to maybeRescale) was 
> done less than scaling-interval.min ago, we should schedule a check 
> that will rescale if min-parallelism-increase is met. Then, what it 
> the use of scaling-interval.max timeout in that context ?
>
>
>>
>> 3) It sounds fine that we lose the cooldown state, because imo we 
>> want to reset the cooldown anyway on job failures (because a job 
>> failure inherently implies a potential rescaling).
>
>
> exactly.
>
>
>>
>> 4) The stabilization time isn't really redundant and serves a 
>> different use-case. The idea behind is that if a users adds multiple 
>> TMs at once then we don't want to rescale immediately at the first 
>> received slot. Without the stabilization time the cooldown would 
>> actually cause bad behavior here, because not only would we rescale 
>> immediately upon receiving the minimum required slots to scale up, 
>> but we also wouldn't use the remaining slots just because the 
>> cooldown says so.
>
>
> I meant the opposite: not having only the cooldown but having only the 
> stabilization time. I must have missed something because what I wonder 
> is: if every rescale entails a restart of the pipeline and every 
> restart entails passing in waiting for resources state, then why 
> introduce a cooldown when there is already at each rescale a stable 
> resource timeout ?
>
>
> Best
>
> Etienne
>
>
>
>>
>> On 16/06/2023 15:47, Etienne Chauchot wrote:
>>> Hi Robert,
>>>
>>> Thanks for your feedback. I don't know the scheduler part well 
>>> enough yet and I'm taking this ticket as a learning workshop.
>>>
>>> Regarding your comments:
>>>
>>> 1. Taking a look at the AdaptiveScheduler class which takes all its 
>>> configuration from the JobManagerOptions, and also to be consistent 
>>> with other parameters name, I'd suggest 
>>> /jobmanager.scheduler-scaling-cooldown-period/
>>>
>>> 2. I thought scaling events existed already and the scheduler 
>>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler 
>>> is in the Executing state and receives new slots") or in FLIP-138 
>>> (cf "Whenever new slots are available the SlotPool notifies the 
>>> Scheduler"). If it is not the case (it is the scheduler who asks for 
>>> slots), then there is no need for storing scaling requests indeed.
>>>
>>> => I need a confirmation here
>>>
>>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler 
>>> state and the CoolDownTimer state. So, upon recovery, it would be as 
>>> if there was no ongoing coolDown period. So, a first re-scale could 
>>> happen right away and it will start a coolDown period. A second 
>>> re-scale would have to wait for the end of this period.
>>>
>>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the 
>>> AdaptiveScheduler passes again in the "waiting for resources" state 
>>> as FLIP-160 suggests. If so, then it seems that the coolDown period 
>>> is kind of redundant with the resource-stabilization-timeout. I 
>>> guess it is not the case otherwise the FLINK-21883 ticket would not 
>>> have been created.
>>>
>>> => I need a confirmation here also.
>>>
>>>
>>> Thanks for your views on point 2 and 4.
>>>
>>>
>>> Best
>>>
>>> Etienne
>>>
>>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>>> Thanks for the FLIP.
>>>>
>>>> Some comments:
>>>> 1. Can you specify the full proposed configuration name? "
>>>> scaling-cooldown-period" is probably not the full config name?
>>>> 2. Why is the concept of scaling events and a scaling queue needed? 
>>>> If I
>>>> remember correctly, the adaptive scheduler will just check how many
>>>> TaskManagers are available and then adjust the execution graph 
>>>> accordingly.
>>>> There's no need to store a number of scaling events. We just need to
>>>> determine the time to trigger an adjustment of the execution graph.
>>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose the 
>>>> state
>>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>>>> cooldown period, so after recovery of a JobManager, we have to wait at
>>>> least for the cooldown period until further scaling operations are 
>>>> done.
>>>> 4. What's the relationship to the
>>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>>> configuration?
>>>>
>>>> Thanks a lot for working on this!
>>>>
>>>> Best,
>>>> Robert
>>>>
>>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot<ec...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>>>>> operations that we discussed below PTAL.
>>>>>
>>>>> Best
>>>>>
>>>>> Etienne
>>>>>
>>>>>
>>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>>> Hi Yuxia,
>>>>>>
>>>>>> Thanks for your feedback. The number of potentially stacked 
>>>>>> operations
>>>>>> depends on the configured length of the cooldown period.
>>>>>>
>>>>>>
>>>>>>
>>>>>> The proposition in the FLIP is to add a minimum delay between 2 
>>>>>> scaling
>>>>>> operations. But, indeed, an optimization could be to still stack the
>>>>>> operations (that arrive during a cooldown period) but maybe not take
>>>>>> only the last operation but rather aggregate them in order to end up
>>>>>> with a single aggregated operation when the cooldown period ends. 
>>>>>> For
>>>>>> example, let's say 3 taskManagers come up and 1 comes down during 
>>>>>> the
>>>>>> cooldown period, we could generate a single operation of scale up +2
>>>>>> when the period ends.
>>>>>>
>>>>>> As a side note regarding your comment on "it'll take a long time to
>>>>>> finish all", please keep in mind that the reactive mode (at least 
>>>>>> for
>>>>>> now) is only available for streaming pipeline which are in essence
>>>>>> infinite processing.
>>>>>>
>>>>>> Another side note: when you mention "every taskManagers connecting",
>>>>>> if you are referring to the start of the pipeline, please keep in 
>>>>>> mind
>>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>>>>>> period before starting the pipeline in which all taskmanagers 
>>>>>> connect
>>>>>> and the parallelism is decided.
>>>>>>
>>>>>> Best
>>>>>>
>>>>>> Etienne
>>>>>>
>>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>>>>> mechanism of the cooldown timeout.
>>>>>>>
>>>>>>>  From the Proposed Changes part, if a scalling event is received 
>>>>>>> and
>>>>>>> it falls during the cooldown period, it'll be stacked to be 
>>>>>>> executed
>>>>>>> after the period ends. Also, from the description of 
>>>>>>> FLINK-21883[1],
>>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>>
>>>>>>> So, is it possible that every taskmanager connecting will produce a
>>>>>>> scalling event and it'll be stacked with many scale up event which
>>>>>>> causes it'll take a long time to finish all? Can we just take the
>>>>>>> last one event?
>>>>>>>
>>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>>
>>>>>>> Best regards, Yuxia
>>>>>>>
>>>>>>> ----- 原始邮件 ----- 发件人: "Etienne 
>>>>>>> Chauchot"<ec...@apache.org>
>>>>>>> 收件人:
>>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<me...@gmail.com>
>>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] 
>>>>>>> FLIP-322
>>>>>>> Cooldown
>>>>>>> period for adaptive scheduler
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I’d like to start a discussion about FLIP-322 [1] which 
>>>>>>> introduces a
>>>>>>> cooldown period for the adaptive scheduler.
>>>>>>>
>>>>>>> I'd like to get your feedback especially @Robert as you opened the
>>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler 
>>>>>
>>>>>> Best
>>>>>>> Etienne
>>
>>


Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Chesnay,

Thanks for your feedback. Comments inline

Le 16/06/2023 à 17:24, Chesnay Schepler a écrit :
> 1) Options specific to the adaptive scheduler should start with 
> "jobmanager.adaptive-scheduler".


ok


> 2)
> There isn't /really /a notion of a "scaling event". The scheduler is 
> informed about new/lost slots and job failures, and reacts accordingly 
> by maybe rescaling the job.
> (sure, you can think of these as events, but you can think of 
> practically everything as events)
>
> There shouldn't be a queue for events. All the scheduler should have 
> to know is that the next rescale check is scheduled for time T, which 
> in practice boils down to a flag and a scheduled action that runs 
> Executing#maybeRescale.


Makes total sense, its very simple like this. Thanks for the precision 
and pointer. After the related FLIPs, I'll look at the code now.


> With that in mind, we also have to look at how we keep this state 
> around. Presumably it is scoped to the current state, such that the 
> cooldown is reset if a job fails.
> Maybe we should add a separate ExecutingWithCooldown state; not sure yet.


Yes loosing cooldown state and cooldown reset upon failure is what I 
suggested in point 3 in previous email. Not sure either for a new state, 
I'll figure it out after experimenting with the code. I'll update the 
FLIP then.


>
> It would be good to clarify whether this FLIP only attempts to cover 
> scale up operations, or also scale downs in case of slot losses.


When there are slots loss, most of the time it is due to a TM loss so 
there should be several slots lost at the same time but (hopefully) only 
once. There should not be many scale downs in a row (but still cascading 
failures can happen). I think, we should just protect against having 
scale ups immediately following. For that, I think we could just keep 
the current behavior of transitioning to Restarting state and then back 
to Waiting for Resources state. This state will protect us against scale 
ups immediately following failure/restart.


>
> We should also think about how it relates to the externalized 
> declarative resource management. Should we always rescale immediately? 
> Should we wait until the cooldown is over?


It relates to point 2, no ? we should rescale immediately only if last 
rescale was done more than scaling-interval.min ago otherwise schedule a 
rescale at last-rescale + scaling-interval.min time.


> Related to this, there's the min-parallelism-increase option, that if 
> for example set to "2" restricts rescale operations to only occur if 
> the parallelism increases by at least 2.


yes I saw that in the code


> Ideally however there would be a max timeout for this.
>
> As such we could maybe think about this a bit differently:
> Add 2 new options instead of 1:
> jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time 
> the scheduler will wait for the next effective rescale operations.
> jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time 
> the scheduler will wait for the next effective rescale operations.


At point 2, we said that when slots change (requirements change or new 
slots available), if last rescale check (call to maybeRescale) was done 
less than scaling-interval.min ago, we should schedule a check that will 
rescale if min-parallelism-increase is met. Then, what it the use of 
scaling-interval.max timeout in that context ?


>
> 3) It sounds fine that we lose the cooldown state, because imo we want 
> to reset the cooldown anyway on job failures (because a job failure 
> inherently implies a potential rescaling).


exactly.


>
> 4) The stabilization time isn't really redundant and serves a 
> different use-case. The idea behind is that if a users adds multiple 
> TMs at once then we don't want to rescale immediately at the first 
> received slot. Without the stabilization time the cooldown would 
> actually cause bad behavior here, because not only would we rescale 
> immediately upon receiving the minimum required slots to scale up, but 
> we also wouldn't use the remaining slots just because the cooldown 
> says so.


I meant the opposite: not having only the cooldown but having only the 
stabilization time. I must have missed something because what I wonder 
is: if every rescale entails a restart of the pipeline and every restart 
entails passing in waiting for resources state, then why introduce a 
cooldown when there is already at each rescale a stable resource timeout ?


Best

Etienne



>
> On 16/06/2023 15:47, Etienne Chauchot wrote:
>> Hi Robert,
>>
>> Thanks for your feedback. I don't know the scheduler part well enough 
>> yet and I'm taking this ticket as a learning workshop.
>>
>> Regarding your comments:
>>
>> 1. Taking a look at the AdaptiveScheduler class which takes all its 
>> configuration from the JobManagerOptions, and also to be consistent 
>> with other parameters name, I'd suggest 
>> /jobmanager.scheduler-scaling-cooldown-period/
>>
>> 2. I thought scaling events existed already and the scheduler 
>> received them as mentioned in FLIP-160 (cf "Whenever the scheduler is 
>> in the Executing state and receives new slots") or in FLIP-138 (cf 
>> "Whenever new slots are available the SlotPool notifies the 
>> Scheduler"). If it is not the case (it is the scheduler who asks for 
>> slots), then there is no need for storing scaling requests indeed.
>>
>> => I need a confirmation here
>>
>> 3. If we loose the JobManager, we loose both the AdaptiveScheduler 
>> state and the CoolDownTimer state. So, upon recovery, it would be as 
>> if there was no ongoing coolDown period. So, a first re-scale could 
>> happen right away and it will start a coolDown period. A second 
>> re-scale would have to wait for the end of this period.
>>
>> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the 
>> AdaptiveScheduler passes again in the "waiting for resources" state 
>> as FLIP-160 suggests. If so, then it seems that the coolDown period 
>> is kind of redundant with the resource-stabilization-timeout. I guess 
>> it is not the case otherwise the FLINK-21883 ticket would not have 
>> been created.
>>
>> => I need a confirmation here also.
>>
>>
>> Thanks for your views on point 2 and 4.
>>
>>
>> Best
>>
>> Etienne
>>
>> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>>> Thanks for the FLIP.
>>>
>>> Some comments:
>>> 1. Can you specify the full proposed configuration name? "
>>> scaling-cooldown-period" is probably not the full config name?
>>> 2. Why is the concept of scaling events and a scaling queue needed? 
>>> If I
>>> remember correctly, the adaptive scheduler will just check how many
>>> TaskManagers are available and then adjust the execution graph 
>>> accordingly.
>>> There's no need to store a number of scaling events. We just need to
>>> determine the time to trigger an adjustment of the execution graph.
>>> 3. What's the behavior wrt to JobManager failures (e.g. we lose the 
>>> state
>>> of the Adaptive Scheduler?). My proposal would be to just reset the
>>> cooldown period, so after recovery of a JobManager, we have to wait at
>>> least for the cooldown period until further scaling operations are 
>>> done.
>>> 4. What's the relationship to the
>>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>>> configuration?
>>>
>>> Thanks a lot for working on this!
>>>
>>> Best,
>>> Robert
>>>
>>> On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot<ec...@apache.org>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>>>> operations that we discussed below PTAL.
>>>>
>>>> Best
>>>>
>>>> Etienne
>>>>
>>>>
>>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>>> Hi Yuxia,
>>>>>
>>>>> Thanks for your feedback. The number of potentially stacked 
>>>>> operations
>>>>> depends on the configured length of the cooldown period.
>>>>>
>>>>>
>>>>>
>>>>> The proposition in the FLIP is to add a minimum delay between 2 
>>>>> scaling
>>>>> operations. But, indeed, an optimization could be to still stack the
>>>>> operations (that arrive during a cooldown period) but maybe not take
>>>>> only the last operation but rather aggregate them in order to end up
>>>>> with a single aggregated operation when the cooldown period ends. For
>>>>> example, let's say 3 taskManagers come up and 1 comes down during the
>>>>> cooldown period, we could generate a single operation of scale up +2
>>>>> when the period ends.
>>>>>
>>>>> As a side note regarding your comment on "it'll take a long time to
>>>>> finish all", please keep in mind that the reactive mode (at least for
>>>>> now) is only available for streaming pipeline which are in essence
>>>>> infinite processing.
>>>>>
>>>>> Another side note: when you mention "every taskManagers connecting",
>>>>> if you are referring to the start of the pipeline, please keep in 
>>>>> mind
>>>>> that the adaptive scheduler has a "waiting for resources" timeout
>>>>> period before starting the pipeline in which all taskmanagers connect
>>>>> and the parallelism is decided.
>>>>>
>>>>> Best
>>>>>
>>>>> Etienne
>>>>>
>>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>>>> mechanism of the cooldown timeout.
>>>>>>
>>>>>>  From the Proposed Changes part, if a scalling event is received and
>>>>>> it falls during the cooldown period, it'll be stacked to be executed
>>>>>> after the period ends. Also, from the description of FLINK-21883[1],
>>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>>> because TaskManagers are not all connecting at the same time.
>>>>>>
>>>>>> So, is it possible that every taskmanager connecting will produce a
>>>>>> scalling event and it'll be stacked with many scale up event which
>>>>>> causes it'll take a long time to finish all? Can we just take the
>>>>>> last one event?
>>>>>>
>>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>>
>>>>>> Best regards, Yuxia
>>>>>>
>>>>>> ----- 原始邮件 ----- 发件人: "Etienne Chauchot"<ec...@apache.org> 
>>>>>>
>>>>>> 收件人:
>>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<me...@gmail.com>
>>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] 
>>>>>> FLIP-322
>>>>>> Cooldown
>>>>>> period for adaptive scheduler
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I’d like to start a discussion about FLIP-322 [1] which introduces a
>>>>>> cooldown period for the adaptive scheduler.
>>>>>>
>>>>>> I'd like to get your feedback especially @Robert as you opened the
>>>>>> related ticket and worked on the reactive mode a lot.
>>>>>>
>>>>>> [1]
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler 
>>>>
>>>>> Best
>>>>>> Etienne
>
>

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Chesnay Schepler <ch...@apache.org>.
1) Options specific to the adaptive scheduler should start with 
"jobmanager.adaptive-scheduler".

2)
There isn't /really /a notion of a "scaling event". The scheduler is 
informed about new/lost slots and job failures, and reacts accordingly 
by maybe rescaling the job.
(sure, you can think of these as events, but you can think of 
practically everything as events)

There shouldn't be a queue for events. All the scheduler should have to 
know is that the next rescale check is scheduled for time T, which in 
practice boils down to a flag and a scheduled action that runs 
Executing#maybeRescale.
With that in mind, we also have to look at how we keep this state 
around. Presumably it is scoped to the current state, such that the 
cooldown is reset if a job fails.
Maybe we should add a separate ExecutingWithCooldown state; not sure yet.

It would be good to clarify whether this FLIP only attempts to cover 
scale up operations, or also scale downs in case of slot losses.

We should also think about how it relates to the externalized 
declarative resource management. Should we always rescale immediately? 
Should we wait until the cooldown is over?
Related to this, there's the min-parallelism-increase option, that if 
for example set to "2" restricts rescale operations to only occur if the 
parallelism increases by at least 2.
Ideally however there would be a max timeout for this.

As such we could maybe think about this a bit differently:
Add 2 new options instead of 1:
jobmanager.adaptive-scheduler.scaling-interval.min: The minimum time the 
scheduler will wait for the next effective rescale operations.
jobmanager.adaptive-scheduler.scaling-interval.max: The maximum time the 
scheduler will wait for the next effective rescale operations.

3) It sounds fine that we lose the cooldown state, because imo we want 
to reset the cooldown anyway on job failures (because a job failure 
inherently implies a potential rescaling).

4) The stabilization time isn't really redundant and serves a different 
use-case. The idea behind is that if a users adds multiple TMs at once 
then we don't want to rescale immediately at the first received slot. 
Without the stabilization time the cooldown would actually cause bad 
behavior here, because not only would we rescale immediately upon 
receiving the minimum required slots to scale up, but we also wouldn't 
use the remaining slots just because the cooldown says so.

On 16/06/2023 15:47, Etienne Chauchot wrote:
> Hi Robert,
>
> Thanks for your feedback. I don't know the scheduler part well enough 
> yet and I'm taking this ticket as a learning workshop.
>
> Regarding your comments:
>
> 1. Taking a look at the AdaptiveScheduler class which takes all its 
> configuration from the JobManagerOptions, and also to be consistent 
> with other parameters name, I'd suggest 
> /jobmanager.scheduler-scaling-cooldown-period/
>
> 2. I thought scaling events existed already and the scheduler received 
> them as mentioned in FLIP-160 (cf "Whenever the scheduler is in the 
> Executing state and receives new slots") or in FLIP-138 (cf "Whenever 
> new slots are available the SlotPool notifies the Scheduler"). If it 
> is not the case (it is the scheduler who asks for slots), then there 
> is no need for storing scaling requests indeed.
>
> => I need a confirmation here
>
> 3. If we loose the JobManager, we loose both the AdaptiveScheduler 
> state and the CoolDownTimer state. So, upon recovery, it would be as 
> if there was no ongoing coolDown period. So, a first re-scale could 
> happen right away and it will start a coolDown period. A second 
> re-scale would have to wait for the end of this period.
>
> 4. When a pipeline is re-scaled, it is restarted. Upon restart, the 
> AdaptiveScheduler passes again in the "waiting for resources" state as 
> FLIP-160 suggests. If so, then it seems that the coolDown period is 
> kind of redundant with the resource-stabilization-timeout. I guess it 
> is not the case otherwise the FLINK-21883 ticket would not have been 
> created.
>
> => I need a confirmation here also.
>
>
> Thanks for your views on point 2 and 4.
>
>
> Best
>
> Etienne
>
> Le 15/06/2023 à 13:35, Robert Metzger a écrit :
>> Thanks for the FLIP.
>>
>> Some comments:
>> 1. Can you specify the full proposed configuration name? "
>> scaling-cooldown-period" is probably not the full config name?
>> 2. Why is the concept of scaling events and a scaling queue needed? If I
>> remember correctly, the adaptive scheduler will just check how many
>> TaskManagers are available and then adjust the execution graph 
>> accordingly.
>> There's no need to store a number of scaling events. We just need to
>> determine the time to trigger an adjustment of the execution graph.
>> 3. What's the behavior wrt to JobManager failures (e.g. we lose the 
>> state
>> of the Adaptive Scheduler?). My proposal would be to just reset the
>> cooldown period, so after recovery of a JobManager, we have to wait at
>> least for the cooldown period until further scaling operations are done.
>> 4. What's the relationship to the
>> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
>> configuration?
>>
>> Thanks a lot for working on this!
>>
>> Best,
>> Robert
>>
>> On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot<ec...@apache.org>
>> wrote:
>>
>>> Hi all,
>>>
>>> @Yukia,I updated the FLIP to include the aggregation of the staked
>>> operations that we discussed below PTAL.
>>>
>>> Best
>>>
>>> Etienne
>>>
>>>
>>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>>> Hi Yuxia,
>>>>
>>>> Thanks for your feedback. The number of potentially stacked operations
>>>> depends on the configured length of the cooldown period.
>>>>
>>>>
>>>>
>>>> The proposition in the FLIP is to add a minimum delay between 2 
>>>> scaling
>>>> operations. But, indeed, an optimization could be to still stack the
>>>> operations (that arrive during a cooldown period) but maybe not take
>>>> only the last operation but rather aggregate them in order to end up
>>>> with a single aggregated operation when the cooldown period ends. For
>>>> example, let's say 3 taskManagers come up and 1 comes down during the
>>>> cooldown period, we could generate a single operation of scale up +2
>>>> when the period ends.
>>>>
>>>> As a side note regarding your comment on "it'll take a long time to
>>>> finish all", please keep in mind that the reactive mode (at least for
>>>> now) is only available for streaming pipeline which are in essence
>>>> infinite processing.
>>>>
>>>> Another side note: when you mention "every taskManagers connecting",
>>>> if you are referring to the start of the pipeline, please keep in mind
>>>> that the adaptive scheduler has a "waiting for resources" timeout
>>>> period before starting the pipeline in which all taskmanagers connect
>>>> and the parallelism is decided.
>>>>
>>>> Best
>>>>
>>>> Etienne
>>>>
>>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>>> mechanism of the cooldown timeout.
>>>>>
>>>>>  From the Proposed Changes part, if a scalling event is received and
>>>>> it falls during the cooldown period, it'll be stacked to be executed
>>>>> after the period ends. Also, from the description of FLINK-21883[1],
>>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>>> because TaskManagers are not all connecting at the same time.
>>>>>
>>>>> So, is it possible that every taskmanager connecting will produce a
>>>>> scalling event and it'll be stacked with many scale up event which
>>>>> causes it'll take a long time to finish all? Can we just take the
>>>>> last one event?
>>>>>
>>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>>
>>>>> Best regards, Yuxia
>>>>>
>>>>> ----- 原始邮件 ----- 发件人: "Etienne Chauchot"<ec...@apache.org>
>>>>> 收件人:
>>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<me...@gmail.com>
>>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] 
>>>>> FLIP-322
>>>>> Cooldown
>>>>> period for adaptive scheduler
>>>>>
>>>>> Hi,
>>>>>
>>>>> I’d like to start a discussion about FLIP-322 [1] which introduces a
>>>>> cooldown period for the adaptive scheduler.
>>>>>
>>>>> I'd like to get your feedback especially @Robert as you opened the
>>>>> related ticket and worked on the reactive mode a lot.
>>>>>
>>>>> [1]
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler 
>>>
>>>> Best
>>>>> Etienne


Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Robert,

Thanks for your feedback. I don't know the scheduler part well enough 
yet and I'm taking this ticket as a learning workshop.

Regarding your comments:

1. Taking a look at the AdaptiveScheduler class which takes all its 
configuration from the JobManagerOptions, and also to be consistent with 
other parameters name, I'd suggest 
/jobmanager.scheduler-scaling-cooldown-period/

2. I thought scaling events existed already and the scheduler received 
them as mentioned in FLIP-160 (cf "Whenever the scheduler is in the 
Executing state and receives new slots") or in FLIP-138 (cf "Whenever 
new slots are available the SlotPool notifies the Scheduler"). If it is 
not the case (it is the scheduler who asks for slots), then there is no 
need for storing scaling requests indeed.

=> I need a confirmation here

3. If we loose the JobManager, we loose both the AdaptiveScheduler state 
and the CoolDownTimer state. So, upon recovery, it would be as if there 
was no ongoing coolDown period. So, a first re-scale could happen right 
away and it will start a coolDown period. A second re-scale would have 
to wait for the end of this period.

4. When a pipeline is re-scaled, it is restarted. Upon restart, the 
AdaptiveScheduler passes again in the "waiting for resources" state as 
FLIP-160 suggests. If so, then it seems that the coolDown period is kind 
of redundant with the resource-stabilization-timeout. I guess it is not 
the case otherwise the FLINK-21883 ticket would not have been created.

=> I need a confirmation here also.


Thanks for your views on point 2 and 4.


Best

Etienne

Le 15/06/2023 à 13:35, Robert Metzger a écrit :
> Thanks for the FLIP.
>
> Some comments:
> 1. Can you specify the full proposed configuration name? "
> scaling-cooldown-period" is probably not the full config name?
> 2. Why is the concept of scaling events and a scaling queue needed? If I
> remember correctly, the adaptive scheduler will just check how many
> TaskManagers are available and then adjust the execution graph accordingly.
> There's no need to store a number of scaling events. We just need to
> determine the time to trigger an adjustment of the execution graph.
> 3. What's the behavior wrt to JobManager failures (e.g. we lose the state
> of the Adaptive Scheduler?). My proposal would be to just reset the
> cooldown period, so after recovery of a JobManager, we have to wait at
> least for the cooldown period until further scaling operations are done.
> 4. What's the relationship to the
> "jobmanager.adaptive-scheduler.resource-stabilization-timeout"
> configuration?
>
> Thanks a lot for working on this!
>
> Best,
> Robert
>
> On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot<ec...@apache.org>
> wrote:
>
>> Hi all,
>>
>> @Yukia,I updated the FLIP to include the aggregation of the staked
>> operations that we discussed below PTAL.
>>
>> Best
>>
>> Etienne
>>
>>
>> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
>>> Hi Yuxia,
>>>
>>> Thanks for your feedback. The number of potentially stacked operations
>>> depends on the configured length of the cooldown period.
>>>
>>>
>>>
>>> The proposition in the FLIP is to add a minimum delay between 2 scaling
>>> operations. But, indeed, an optimization could be to still stack the
>>> operations (that arrive during a cooldown period) but maybe not take
>>> only the last operation but rather aggregate them in order to end up
>>> with a single aggregated operation when the cooldown period ends. For
>>> example, let's say 3 taskManagers come up and 1 comes down during the
>>> cooldown period, we could generate a single operation of scale up +2
>>> when the period ends.
>>>
>>> As a side note regarding your comment on "it'll take a long time to
>>> finish all", please keep in mind that the reactive mode (at least for
>>> now) is only available for streaming pipeline which are in essence
>>> infinite processing.
>>>
>>> Another side note: when you mention "every taskManagers connecting",
>>> if you are referring to the start of the pipeline, please keep in mind
>>> that the adaptive scheduler has a "waiting for resources" timeout
>>> period before starting the pipeline in which all taskmanagers connect
>>> and the parallelism is decided.
>>>
>>> Best
>>>
>>> Etienne
>>>
>>> Le 13/06/2023 à 03:58, yuxia a écrit :
>>>> Hi, Etienne. Thanks for driving it. I have one question about the
>>>> mechanism of the cooldown timeout.
>>>>
>>>>  From the Proposed Changes part, if a scalling event is received and
>>>> it falls during the cooldown period, it'll be stacked to be executed
>>>> after the period ends. Also, from the description of FLINK-21883[1],
>>>> cooldown timeout is to avoid rescaling the job very frequently,
>>>> because TaskManagers are not all connecting at the same time.
>>>>
>>>> So, is it possible that every taskmanager connecting will produce a
>>>> scalling event and it'll be stacked with many scale up event which
>>>> causes it'll take a long time to finish all? Can we just take the
>>>> last one event?
>>>>
>>>> [1]:https://issues.apache.org/jira/browse/FLINK-21883
>>>>
>>>> Best regards, Yuxia
>>>>
>>>> ----- 原始邮件 ----- 发件人: "Etienne Chauchot"<ec...@apache.org>
>>>> 收件人:
>>>> "dev"<de...@flink.apache.org>, "Robert Metzger"<me...@gmail.com>
>>>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322
>>>> Cooldown
>>>> period for adaptive scheduler
>>>>
>>>> Hi,
>>>>
>>>> I’d like to start a discussion about FLIP-322 [1] which introduces a
>>>> cooldown period for the adaptive scheduler.
>>>>
>>>> I'd like to get your feedback especially @Robert as you opened the
>>>> related ticket and worked on the reactive mode a lot.
>>>>
>>>> [1]
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>> Best
>>>> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Robert Metzger <rm...@apache.org>.
Thanks for the FLIP.

Some comments:
1. Can you specify the full proposed configuration name? "
scaling-cooldown-period" is probably not the full config name?
2. Why is the concept of scaling events and a scaling queue needed? If I
remember correctly, the adaptive scheduler will just check how many
TaskManagers are available and then adjust the execution graph accordingly.
There's no need to store a number of scaling events. We just need to
determine the time to trigger an adjustment of the execution graph.
3. What's the behavior wrt to JobManager failures (e.g. we lose the state
of the Adaptive Scheduler?). My proposal would be to just reset the
cooldown period, so after recovery of a JobManager, we have to wait at
least for the cooldown period until further scaling operations are done.
4. What's the relationship to the
"jobmanager.adaptive-scheduler.resource-stabilization-timeout"
configuration?

Thanks a lot for working on this!

Best,
Robert

On Wed, Jun 14, 2023 at 3:38 PM Etienne Chauchot <ec...@apache.org>
wrote:

> Hi all,
>
> @Yukia,I updated the FLIP to include the aggregation of the staked
> operations that we discussed below PTAL.
>
> Best
>
> Etienne
>
>
> Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
> > Hi Yuxia,
> >
> > Thanks for your feedback. The number of potentially stacked operations
> > depends on the configured length of the cooldown period.
> >
> >
> >
> > The proposition in the FLIP is to add a minimum delay between 2 scaling
> > operations. But, indeed, an optimization could be to still stack the
> > operations (that arrive during a cooldown period) but maybe not take
> > only the last operation but rather aggregate them in order to end up
> > with a single aggregated operation when the cooldown period ends. For
> > example, let's say 3 taskManagers come up and 1 comes down during the
> > cooldown period, we could generate a single operation of scale up +2
> > when the period ends.
> >
> > As a side note regarding your comment on "it'll take a long time to
> > finish all", please keep in mind that the reactive mode (at least for
> > now) is only available for streaming pipeline which are in essence
> > infinite processing.
> >
> > Another side note: when you mention "every taskManagers connecting",
> > if you are referring to the start of the pipeline, please keep in mind
> > that the adaptive scheduler has a "waiting for resources" timeout
> > period before starting the pipeline in which all taskmanagers connect
> > and the parallelism is decided.
> >
> > Best
> >
> > Etienne
> >
> > Le 13/06/2023 à 03:58, yuxia a écrit :
> >> Hi, Etienne. Thanks for driving it. I have one question about the
> >> mechanism of the cooldown timeout.
> >>
> >> From the Proposed Changes part, if a scalling event is received and
> >> it falls during the cooldown period, it'll be stacked to be executed
> >> after the period ends. Also, from the description of FLINK-21883[1],
> >> cooldown timeout is to avoid rescaling the job very frequently,
> >> because TaskManagers are not all connecting at the same time.
> >>
> >> So, is it possible that every taskmanager connecting will produce a
> >> scalling event and it'll be stacked with many scale up event which
> >> causes it'll take a long time to finish all? Can we just take the
> >> last one event?
> >>
> >> [1]: https://issues.apache.org/jira/browse/FLINK-21883
> >>
> >> Best regards, Yuxia
> >>
> >> ----- 原始邮件 ----- 发件人: "Etienne Chauchot" <ec...@apache.org>
> >> 收件人:
> >> "dev" <de...@flink.apache.org>, "Robert Metzger" <me...@gmail.com>
> >> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322
> >> Cooldown
> >> period for adaptive scheduler
> >>
> >> Hi,
> >>
> >> I’d like to start a discussion about FLIP-322 [1] which introduces a
> >> cooldown period for the adaptive scheduler.
> >>
> >> I'd like to get your feedback especially @Robert as you opened the
> >> related ticket and worked on the reactive mode a lot.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
> >>
> >>
> >>
> > Best
> >>
> >> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi all,

@Yukia,I updated the FLIP to include the aggregation of the staked 
operations that we discussed below PTAL.

Best

Etienne


Le 13/06/2023 à 16:31, Etienne Chauchot a écrit :
> Hi Yuxia,
>
> Thanks for your feedback. The number of potentially stacked operations 
> depends on the configured length of the cooldown period.
>
>
>
> The proposition in the FLIP is to add a minimum delay between 2 scaling
> operations. But, indeed, an optimization could be to still stack the
> operations (that arrive during a cooldown period) but maybe not take
> only the last operation but rather aggregate them in order to end up
> with a single aggregated operation when the cooldown period ends. For
> example, let's say 3 taskManagers come up and 1 comes down during the
> cooldown period, we could generate a single operation of scale up +2
> when the period ends.
>
> As a side note regarding your comment on "it'll take a long time to 
> finish all", please keep in mind that the reactive mode (at least for 
> now) is only available for streaming pipeline which are in essence 
> infinite processing.
>
> Another side note: when you mention "every taskManagers connecting", 
> if you are referring to the start of the pipeline, please keep in mind 
> that the adaptive scheduler has a "waiting for resources" timeout 
> period before starting the pipeline in which all taskmanagers connect 
> and the parallelism is decided.
>
> Best
>
> Etienne
>
> Le 13/06/2023 à 03:58, yuxia a écrit :
>> Hi, Etienne. Thanks for driving it. I have one question about the
>> mechanism of the cooldown timeout.
>>
>> From the Proposed Changes part, if a scalling event is received and
>> it falls during the cooldown period, it'll be stacked to be executed
>> after the period ends. Also, from the description of FLINK-21883[1],
>> cooldown timeout is to avoid rescaling the job very frequently,
>> because TaskManagers are not all connecting at the same time.
>>
>> So, is it possible that every taskmanager connecting will produce a
>> scalling event and it'll be stacked with many scale up event which
>> causes it'll take a long time to finish all? Can we just take the
>> last one event?
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-21883
>>
>> Best regards, Yuxia
>>
>> ----- 原始邮件 ----- 发件人: "Etienne Chauchot" <ec...@apache.org> 
>> 收件人:
>> "dev" <de...@flink.apache.org>, "Robert Metzger" <me...@gmail.com> 
>> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322 
>> Cooldown
>> period for adaptive scheduler
>>
>> Hi,
>>
>> I’d like to start a discussion about FLIP-322 [1] which introduces a 
>> cooldown period for the adaptive scheduler.
>>
>> I'd like to get your feedback especially @Robert as you opened the 
>> related ticket and worked on the reactive mode a lot.
>>
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>>
>>
>>
> Best
>>
>> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by Etienne Chauchot <ec...@apache.org>.
Hi Yuxia,

Thanks for your feedback. The number of potentially stacked operations 
depends on the configured length of the cooldown period.



The proposition in the FLIP is to add a minimum delay between 2 scaling
operations. But, indeed, an optimization could be to still stack the
operations (that arrive during a cooldown period) but maybe not take
only the last operation but rather aggregate them in order to end up
with a single aggregated operation when the cooldown period ends. For
example, let's say 3 taskManagers come up and 1 comes down during the
cooldown period, we could generate a single operation of scale up +2
when the period ends.

As a side note regarding your comment on "it'll take a long time to 
finish all", please keep in mind that the reactive mode (at least for 
now) is only available for streaming pipeline which are in essence 
infinite processing.

Another side note: when you mention "every taskManagers connecting", if 
you are referring to the start of the pipeline, please keep in mind that 
the adaptive scheduler has a "waiting for resources" timeout period 
before starting the pipeline in which all taskmanagers connect and the 
parallelism is decided.

Best

Etienne

Le 13/06/2023 à 03:58, yuxia a écrit :
> Hi, Etienne. Thanks for driving it. I have one question about the
> mechanism of the cooldown timeout.
> 
> From the Proposed Changes part, if a scalling event is received and
> it falls during the cooldown period, it'll be stacked to be executed
> after the period ends. Also, from the description of FLINK-21883[1],
> cooldown timeout is to avoid rescaling the job very frequently,
> because TaskManagers are not all connecting at the same time.
> 
> So, is it possible that every taskmanager connecting will produce a
> scalling event and it'll be stacked with many scale up event which
> causes it'll take a long time to finish all? Can we just take the
> last one event?
> 
> [1]: https://issues.apache.org/jira/browse/FLINK-21883
> 
> Best regards, Yuxia
> 
> ----- 原始邮件 ----- 发件人: "Etienne Chauchot" <ec...@apache.org> 收件人:
> "dev" <de...@flink.apache.org>, "Robert Metzger" <me...@gmail.com> 
> 发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25 主题: [DISCUSS] FLIP-322 Cooldown
> period for adaptive scheduler
> 
> Hi,
> 
> I’d like to start a discussion about FLIP-322 [1] which introduces a 
> cooldown period for the adaptive scheduler.
> 
> I'd like to get your feedback especially @Robert as you opened the 
> related ticket and worked on the reactive mode a lot.
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler
>
>
> 
Best
> 
> Etienne

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Posted by yuxia <lu...@alumni.sjtu.edu.cn>.
Hi, Etienne.
Thanks for driving it.
I have one question about the mechanism of the cooldown timeout.

From the Proposed Changes part, if a scalling event is received and it falls during the cooldown period, it'll be stacked to be executed after the period ends.
Also, from the description of FLINK-21883[1], cooldown timeout is to avoid rescaling the job very frequently, because TaskManagers are not all connecting at the same time.

So, is it possible that every taskmanager connecting will produce a scalling event and it'll be stacked with many scale up event which causes it'll take a long time to finish all?
Can we just take the last one event?

[1]: https://issues.apache.org/jira/browse/FLINK-21883

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "Etienne Chauchot" <ec...@apache.org>
收件人: "dev" <de...@flink.apache.org>, "Robert Metzger" <me...@gmail.com>
发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25
主题: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Hi,

I’d like to start a discussion about FLIP-322 [1] which introduces a 
cooldown period for the adaptive scheduler.

I'd like to get your feedback especially @Robert as you opened the 
related ticket and worked on the reactive mode a lot.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler

Best

Etienne