You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Robert Metzger <rm...@apache.org> on 2021/01/22 16:41:50 UTC

[DISCUSS] FLIP-159: Reactive Mode

Hi all,

Till started a discussion about FLIP-160: Declarative scheduler [1] earlier
today, the first major feature based on that effort will be FLIP-159:
Reactive Mode. It allows users to operate Flink in a way that it reactively
scales the job up or down depending on the provided resources: adding
TaskManagers will scale the job up, removing them will scale it down again.

Here's the link to the Wiki:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode

We are very excited to hear your feedback about the proposal!

Best,
Robert

[1]
https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Robert Metzger <rm...@apache.org>.
The discussion has been open for 6 days, and it seems that all questions
and concerns raised so far have been addressed.

I will start a VOTE thread for FLIP-159 now.

On Tue, Jan 26, 2021 at 3:45 PM Yang Wang <da...@gmail.com> wrote:

> Thanks Robert and Till for the thorough explanation.
>
> Now I understand key difference between reactive mode and auto scaling
> mode.
> For the latter, we could dynamically adjust the desired value based on
> monitoring
> the metrics(e.g. cpu, memory, latency, delay, etc.). Since the reactive is
> simpler
> and a special case of auto scaling mode, I am also fine with the current
> scope of
> this FLIP.
>
>
> Best,
> Yang
>
> Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午6:09写道:
>
> > Thanks a lot for all the feedback Steven, Yang Wang and Xintong. I have a
> > few more comments to add.
> >
> > # Keep it simple and stupid
> >
> > As Robert said we would like to keep the new feature initially as simple
> as
> > possible in order to quickly implement it. Once we have a basic
> > implementation, we want to reach out to our users to try it out and give
> us
> > feedback on the future development direction. We hope to create a better
> > feature involving our users as early as possible. That's why the proposal
> > is quite barebone.
> >
> > Yang Wang's proposal to make the timeout value configurable sounds very
> > simple to me and might already improve usability big time w/o causing a
> lot
> > of implementation effort. Hence, I think it would be a good idea to
> include
> > this feature into the initial design.
> >
> > # Cluster vs. job configuration
> >
> > I am not entirely sure whether the execution mode is a job level
> > configuration. I think about it more like a deployment/execution/run
> option
> > because one and the same job A can be executed with a fixed parallelism
> on
> > a session cluster or using the reactive mode. Unfortunately, we don't
> have
> > this kind of distinction at the moment. Consequently, the idea was to
> first
> > introduce a cluster level configuration which might be ignored or cause a
> > fatal error when being used with the wrong deployment.
> >
> > # Active resource managers
> >
> > As Robert explained, the reactive mode is not designed for active
> resource
> > managers. However, by activating only the declarative scheduler for an
> > active deployment, we should be able to run a job (streaming jobs only
> for
> > the time being) even if the active RM could not allocate all the required
> > resources as you've described Xintong.
> >
> > # Auto-scaling
> >
> > Auto-scaling, which allows Flink jobs to control its resources, will be a
> > super helpful feature. I believe that we can build auto-scaling using the
> > declarative scheduler similar to how the reactive mode uses it. The main
> > difference between auto-scaling and the reactive mode is that the Flink
> > job needs to decide on the desired number of slots. In the reactive mode
> we
> > say that the desired value is "infinity" whereas for an auto-scaled job,
> > the job is able to dynamically adjust the desired value. However,
> > auto-scaling will not be part of this FLIP.
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 26, 2021 at 10:36 AM Xintong Song <to...@gmail.com>
> > wrote:
> >
> > > Thanks for the explanation, Robert.
> > >
> > > Now I see how these things are expected to be supported in steps.
> > >
> > > I think you are right. Demanding a fixed finite amount of resources can
> > be
> > > considered as a special case of `ScalingPolicy`. I'm now good with the
> > > current scope of reactive mode as a first step, and support active
> > resource
> > > managers with autoscaling mode after stabilizing FLIP-159/160.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rm...@apache.org>
> > > wrote:
> > >
> > > > Thanks for your thoughts Xintong! What you are writing is very
> valuable
> > > > feedback for me, as I have limited experience with real-world
> > > deployments.
> > > > It seems that autoscaling support is a really important follow up.
> > > >
> > > > ## active resource managers
> > > >
> > > > I guess you can consider reactive mode a special case of the more
> > generic
> > > > autoscaling mode. Once we extend the interfaces in the declarative
> > > > scheduler to allow autoscaling mode, the scenarios you are describing
> > are
> > > > possible.
> > > > We already had some ideas for some extended interfaces that would
> > cover a
> > > > great variety of cases. We could allow the policy to determine the
> > number
> > > > of desired slots, and propose a parallelism assignment based on that
> to
> > > the
> > > > policy. This would also work with making calls to external services
> > > > to decide the scale etc.
> > > > However implementing FLIP-159 and FLIP-160 might take quite a bit of
> > time
> > > > to stabilize all the corner cases. Once that is done, we'll publish a
> > > FLIP
> > > > with an advanced interface for autoscaling.
> > > >
> > > > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <to...@gmail.com>
> > > > wrote:
> > > >
> > > > > ## configuration option
> > > > >
> > > > > I see your point that autoscaling mode might be more suitable for
> > > session
> > > > > clusters. It doesn't change that `execution-mode` could be a
> > job-level
> > > > > configuration. But I'm good with keeping it cluster-level and
> marking
> > > it
> > > > > experimental at the moment, so we can change it later if needed for
> > the
> > > > > autoscaling mode.
> > > > >
> > > > > ## active resource managers
> > > > >
> > > > > I'm a bit confused about the boundary between reactive mode and
> > > > autoscaling
> > > > > mode.
> > > > > - Reactive mode requests an infinite amount of resources, and
> > executes
> > > at
> > > > > the largest parallelism that is possible with the available
> > resources.
> > > > > - Autoscaling mode dynamically adjusts resource demand, and
> executes
> > > at a
> > > > > parallelism that is either demanded or as large as possible if the
> > > > > demanded parallelism cannot be reached.
> > > > > - What about something in between? A job is not capable of
> > dynamically
> > > > > adjusting the resource demand and requests a fixed finite amount of
> > > > > resources, and still wants to be executed with as large
> parallelisms
> > as
> > > > > possible if the demanded parallelism cannot be reached?
> > > > >
> > > > > It's quite common that a job may temporarily not get as much
> > resources
> > > as
> > > > > desired, due to running of other higher priority jobs in the
> > > > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the
> > user
> > > > > needs to configure the job with a different parallelism, or the job
> > > > cannot
> > > > > be executed. It would be helpful if the job can execute with a
> lower
> > > > > parallelism, and automatically scales up to the original desired
> > > > > parallelism when more resources become available.
> > > > >
> > > > >
> > > > > For Yarn, there's comprehensive queue based resource quota
> > management,
> > > > > where how many resources each job gets are closely related to other
> > > jobs'
> > > > > resource requirements. For Kubernetes, while the default
> > kube-scheduler
> > > > > does not have such mature multi-tenant support, there are other
> > > projects
> > > > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > > > > capability to Kubernetes
> > > > >
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1] https://yunikorn.apache.org/
> > > > >
> > > > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <
> rmetzger@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Thank you very much for the comments so far.
> > > > > >
> > > > > > @Steven:
> > > > > >
> > > > > > No fixed parallelism for any of the operators
> > > > > > >
> > > > > > > Regarding this limitation, can the scheduler only adjust the
> > > default
> > > > > > > parallelism? if some operators set parallelism explicitly (like
> > > > always
> > > > > > 1),
> > > > > > > just leave them unchanged.
> > > > > >
> > > > > >
> > > > > > We will respect the configured maxParallelism for that purpose.
> If
> > > you
> > > > > have
> > > > > > an operator that is not intended to run in parallel, you can set
> > > > > > maxParalellism=1.
> > > > > >
> > > > > > @Xintong:
> > > > > >
> > > > > > the cluster configuration option will limit us from having jobs
> > > running
> > > > > > > with different execution modes in the same session cluster.
> > > > > >
> > > > > >
> > > > > > I'm not sure if it makes sense to support reactive mode in a
> > session
> > > > > > cluster ever. For an autoscaling mode, it probably makes sense
> (as
> > we
> > > > can
> > > > > > just combine the resource requests from all running jobs, and
> > > > distribute
> > > > > > the available resources proportional to the requested resources).
> > > > > >
> > > > > > I will state more clearly in the FLIP that the configuration
> > options
> > > > > should
> > > > > > be marked as experimental.
> > > > > >
> > > > > > Active resource managers
> > > > > >
> > > > > > [...]
> > > > > >
> > > > > > If this is the only concern, I'd like to bring the configuration
> > > option
> > > > > > > `slotmanager.number-of-slots.max` to your attention.
> > > > > >
> > > > > >
> > > > > > I understand and agree that it would be really nice to support
> > active
> > > > > > resource managers with the new scheduler right away. In my
> opinion,
> > > > > > reactive mode will never be really supported by active resource
> > > > managers,
> > > > > > as this is a contradiction with the idea of reactive mode: It is
> > > > > explicitly
> > > > > > designed to allow controlling the cluster from the outside
> (similar
> > > to
> > > > > > Kafka streams, where you add and remove capacity for scaling).
> > > > > Integration
> > > > > > with active resource managers should be added in a autoscaling
> > mode,
> > > > > based
> > > > > > on the declarative scheduler.
> > > > > > I've considered the slotmanager.number-of-slots.max option as
> well,
> > > but
> > > > > it
> > > > > > basically means that your cluster will always immediately scale
> up
> > > > > > to slotmanager.number-of-slots.max and stick to that value, even
> if
> > > > those
> > > > > > resources are not needed.
> > > > > > On YARN, it would be pretty difficult or even impossible to
> control
> > > the
> > > > > > scale of such a Flink deployment from the outside (using a queue
> > with
> > > > the
> > > > > > capacity scheduler won't work, as changes to queues require
> > restarts)
> > > > > > On K8s, one would have to build a custom tool that finds the
> > > deployment
> > > > > > created by Flink and adjusts it. Then, it's probably easier to
> just
> > > > > create
> > > > > > a standalone deployment on K8s.
> > > > > >
> > > > > > @Yang:
> > > > > >
> > > > > > It will be better to make the 10 seconds to be configurable.
> > > > > >
> > > > > >
> > > > > > I agree that it is pretty bold to have such an important
> > > configuration
> > > > > > parameter hardcoded. We proposed it like this to keep the first
> > > > > > implementation as simple as possible.
> > > > > > But if we see that basically everybody is asking for this, or if
> we
> > > > have
> > > > > > time left at the end of the release cycle, we'll make it
> > > configurable.
> > > > > >
> > > > > >
> > > > > > but also the ScalingPolicy is not exposed to the users now
> > > > > >
> > > > > >
> > > > > > Exposing the ScalingPolicy to the user is very high on our
> priority
> > > > list,
> > > > > > but we want to keep the first version as simple as possible, to
> be
> > > able
> > > > > to
> > > > > > deliver the overall feature in time, and to collect some initial
> > user
> > > > > > feedback before coming up with an interface we want to expose.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <danrtsey.wy@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Thanks Robert for creating this FLIP and starting the
> discussion.
> > > > > > >
> > > > > > > This is a great start point to make Flink work with auto
> scaling
> > > > > service.
> > > > > > > The reactive mode
> > > > > > > is very useful in containerized environment(e.g. docker,
> > > Kubernetes).
> > > > > For
> > > > > > > example, combined
> > > > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the
> TaskManagers
> > > > could
> > > > > be
> > > > > > > started/released
> > > > > > > dynamically based on the system metrics(e.g. cpu, memory) and
> > > custom
> > > > > > > metrics(e.g. delay, latency).
> > > > > > >
> > > > > > >
> > > > > > > > Once the job has started running, and a TaskManager is lost,
> it
> > > > will
> > > > > > wait
> > > > > > > > for 10 seconds for the
> > > > > > >
> > > > > > > TaskManager to re-appear.
> > > > > > >
> > > > > > > It will be better to make the 10 seconds to be configurable.
> > > > According
> > > > > to
> > > > > > > our production experience
> > > > > > > on Kubernetes, 10 seconds is not enough for a pod to be
> > relaunched.
> > > > > Maybe
> > > > > > > this is also a specific
> > > > > > > case whether the resource is stable or not.
> > > > > > >
> > > > > > > > Active ResourceManager
> > > > > > >
> > > > > > > IIUC, the reason why reactive mode could not work with active
> > > > resource
> > > > > > > manager is not only
> > > > > > > about requesting infinite amount of resources, but also the
> > > > > ScalingPolicy
> > > > > > > is not exposed to the
> > > > > > > users now. ScalingPolicy could be the bridge between reactive
> > mode
> > > > and
> > > > > > > active resource manager.
> > > > > > > User could have their own auto scaling service, which monitor
> the
> > > > Flink
> > > > > > > metrics and then update
> > > > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> > > > resource
> > > > > > > manager could allocate these
> > > > > > > TaskManagers.
> > > > > > > But it is out the scope of this FLIP, I really expect this
> could
> > be
> > > > > done
> > > > > > in
> > > > > > > the future. And it will be another
> > > > > > > great step to make Flink auto scalable.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > [1].
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yang
> > > > > > >
> > > > > > >
> > > > > > > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > > > > >
> > > > > > > > Thanks for preparing the FLIP and starting the discussion,
> > > Robert.
> > > > > > > >
> > > > > > > > ## Cluster vs. Job configuration
> > > > > > > > As I have commented on the FLIP-160 discussion thread [1],
> I'm
> > a
> > > > bit
> > > > > > > unsure
> > > > > > > > about activating the reactive execution mode via a cluster
> > level
> > > > > > > > configuration option. I'm aware that in the first step this
> > > feature
> > > > > > does
> > > > > > > > not support session clusters. However, I think that does not
> > mean
> > > > it
> > > > > > > won't
> > > > > > > > be supported in future. In that case, the cluster
> configuration
> > > > > option
> > > > > > > will
> > > > > > > > limit us from having jobs running with different execution
> > modes
> > > in
> > > > > the
> > > > > > > > same session cluster.
> > > > > > > >
> > > > > > > > ## Active resource managers
> > > > > > > > According to the FLIP, this feature explicitly does not
> support
> > > > > active
> > > > > > > > resource managers. IIUC, this is because when in this feature
> > the
> > > > job
> > > > > > > > requests an infinite amount of resources, which would flood
> > > > > Kubernetes
> > > > > > /
> > > > > > > > Yarn / Mesos with unreasonably large number of resource
> > requests.
> > > > If
> > > > > > this
> > > > > > > > is the only concern, I'd like to bring the configuration
> option
> > > > > > > > `slotmanager.number-of-slots.max` to your attention. This
> > feature
> > > > > > allows
> > > > > > > > putting an upper limit to the total number of slots the Flink
> > > > cluster
> > > > > > > uses,
> > > > > > > > preventing active resource managers from allocating too many
> > > > > resources
> > > > > > > from
> > > > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns
> > that I
> > > > > > > > overlooked, I think it would be nicer for the reactive mode
> to
> > > also
> > > > > > > support
> > > > > > > > active resource managers, with the additional requirement to
> > > > > explicitly
> > > > > > > > configure the max slots.
> > > > > > > >
> > > > > > > > Thank you~
> > > > > > > >
> > > > > > > > Xintong Song
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > > > > >
> > > > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <
> > stevenz3wu@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > > > > >
> > > > > > > > > > No fixed parallelism for any of the operators
> > > > > > > > >
> > > > > > > > > Regarding this limitation, can the scheduler only adjust
> the
> > > > > default
> > > > > > > > > parallelism? if some operators set parallelism explicitly
> > (like
> > > > > > always
> > > > > > > > 1),
> > > > > > > > > just leave them unchanged.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > > > > rmetzger@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > Till started a discussion about FLIP-160: Declarative
> > > scheduler
> > > > > [1]
> > > > > > > > > earlier
> > > > > > > > > > today, the first major feature based on that effort will
> be
> > > > > > FLIP-159:
> > > > > > > > > > Reactive Mode. It allows users to operate Flink in a way
> > that
> > > > it
> > > > > > > > > reactively
> > > > > > > > > > scales the job up or down depending on the provided
> > > resources:
> > > > > > adding
> > > > > > > > > > TaskManagers will scale the job up, removing them will
> > scale
> > > it
> > > > > > down
> > > > > > > > > again.
> > > > > > > > > >
> > > > > > > > > > Here's the link to the Wiki:
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > > > >
> > > > > > > > > > We are very excited to hear your feedback about the
> > proposal!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Robert
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Yang Wang <da...@gmail.com>.
Thanks Robert and Till for the thorough explanation.

Now I understand key difference between reactive mode and auto scaling mode.
For the latter, we could dynamically adjust the desired value based on
monitoring
the metrics(e.g. cpu, memory, latency, delay, etc.). Since the reactive is
simpler
and a special case of auto scaling mode, I am also fine with the current
scope of
this FLIP.


Best,
Yang

Till Rohrmann <tr...@apache.org> 于2021年1月26日周二 下午6:09写道:

> Thanks a lot for all the feedback Steven, Yang Wang and Xintong. I have a
> few more comments to add.
>
> # Keep it simple and stupid
>
> As Robert said we would like to keep the new feature initially as simple as
> possible in order to quickly implement it. Once we have a basic
> implementation, we want to reach out to our users to try it out and give us
> feedback on the future development direction. We hope to create a better
> feature involving our users as early as possible. That's why the proposal
> is quite barebone.
>
> Yang Wang's proposal to make the timeout value configurable sounds very
> simple to me and might already improve usability big time w/o causing a lot
> of implementation effort. Hence, I think it would be a good idea to include
> this feature into the initial design.
>
> # Cluster vs. job configuration
>
> I am not entirely sure whether the execution mode is a job level
> configuration. I think about it more like a deployment/execution/run option
> because one and the same job A can be executed with a fixed parallelism on
> a session cluster or using the reactive mode. Unfortunately, we don't have
> this kind of distinction at the moment. Consequently, the idea was to first
> introduce a cluster level configuration which might be ignored or cause a
> fatal error when being used with the wrong deployment.
>
> # Active resource managers
>
> As Robert explained, the reactive mode is not designed for active resource
> managers. However, by activating only the declarative scheduler for an
> active deployment, we should be able to run a job (streaming jobs only for
> the time being) even if the active RM could not allocate all the required
> resources as you've described Xintong.
>
> # Auto-scaling
>
> Auto-scaling, which allows Flink jobs to control its resources, will be a
> super helpful feature. I believe that we can build auto-scaling using the
> declarative scheduler similar to how the reactive mode uses it. The main
> difference between auto-scaling and the reactive mode is that the Flink
> job needs to decide on the desired number of slots. In the reactive mode we
> say that the desired value is "infinity" whereas for an auto-scaled job,
> the job is able to dynamically adjust the desired value. However,
> auto-scaling will not be part of this FLIP.
>
> Cheers,
> Till
>
> On Tue, Jan 26, 2021 at 10:36 AM Xintong Song <to...@gmail.com>
> wrote:
>
> > Thanks for the explanation, Robert.
> >
> > Now I see how these things are expected to be supported in steps.
> >
> > I think you are right. Demanding a fixed finite amount of resources can
> be
> > considered as a special case of `ScalingPolicy`. I'm now good with the
> > current scope of reactive mode as a first step, and support active
> resource
> > managers with autoscaling mode after stabilizing FLIP-159/160.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rm...@apache.org>
> > wrote:
> >
> > > Thanks for your thoughts Xintong! What you are writing is very valuable
> > > feedback for me, as I have limited experience with real-world
> > deployments.
> > > It seems that autoscaling support is a really important follow up.
> > >
> > > ## active resource managers
> > >
> > > I guess you can consider reactive mode a special case of the more
> generic
> > > autoscaling mode. Once we extend the interfaces in the declarative
> > > scheduler to allow autoscaling mode, the scenarios you are describing
> are
> > > possible.
> > > We already had some ideas for some extended interfaces that would
> cover a
> > > great variety of cases. We could allow the policy to determine the
> number
> > > of desired slots, and propose a parallelism assignment based on that to
> > the
> > > policy. This would also work with making calls to external services
> > > to decide the scale etc.
> > > However implementing FLIP-159 and FLIP-160 might take quite a bit of
> time
> > > to stabilize all the corner cases. Once that is done, we'll publish a
> > FLIP
> > > with an advanced interface for autoscaling.
> > >
> > > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <to...@gmail.com>
> > > wrote:
> > >
> > > > ## configuration option
> > > >
> > > > I see your point that autoscaling mode might be more suitable for
> > session
> > > > clusters. It doesn't change that `execution-mode` could be a
> job-level
> > > > configuration. But I'm good with keeping it cluster-level and marking
> > it
> > > > experimental at the moment, so we can change it later if needed for
> the
> > > > autoscaling mode.
> > > >
> > > > ## active resource managers
> > > >
> > > > I'm a bit confused about the boundary between reactive mode and
> > > autoscaling
> > > > mode.
> > > > - Reactive mode requests an infinite amount of resources, and
> executes
> > at
> > > > the largest parallelism that is possible with the available
> resources.
> > > > - Autoscaling mode dynamically adjusts resource demand, and executes
> > at a
> > > > parallelism that is either demanded or as large as possible if the
> > > > demanded parallelism cannot be reached.
> > > > - What about something in between? A job is not capable of
> dynamically
> > > > adjusting the resource demand and requests a fixed finite amount of
> > > > resources, and still wants to be executed with as large parallelisms
> as
> > > > possible if the demanded parallelism cannot be reached?
> > > >
> > > > It's quite common that a job may temporarily not get as much
> resources
> > as
> > > > desired, due to running of other higher priority jobs in the
> > > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the
> user
> > > > needs to configure the job with a different parallelism, or the job
> > > cannot
> > > > be executed. It would be helpful if the job can execute with a lower
> > > > parallelism, and automatically scales up to the original desired
> > > > parallelism when more resources become available.
> > > >
> > > >
> > > > For Yarn, there's comprehensive queue based resource quota
> management,
> > > > where how many resources each job gets are closely related to other
> > jobs'
> > > > resource requirements. For Kubernetes, while the default
> kube-scheduler
> > > > does not have such mature multi-tenant support, there are other
> > projects
> > > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > > > capability to Kubernetes
> > > >
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > > [1] https://yunikorn.apache.org/
> > > >
> > > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rm...@apache.org>
> > > > wrote:
> > > >
> > > > > Thank you very much for the comments so far.
> > > > >
> > > > > @Steven:
> > > > >
> > > > > No fixed parallelism for any of the operators
> > > > > >
> > > > > > Regarding this limitation, can the scheduler only adjust the
> > default
> > > > > > parallelism? if some operators set parallelism explicitly (like
> > > always
> > > > > 1),
> > > > > > just leave them unchanged.
> > > > >
> > > > >
> > > > > We will respect the configured maxParallelism for that purpose. If
> > you
> > > > have
> > > > > an operator that is not intended to run in parallel, you can set
> > > > > maxParalellism=1.
> > > > >
> > > > > @Xintong:
> > > > >
> > > > > the cluster configuration option will limit us from having jobs
> > running
> > > > > > with different execution modes in the same session cluster.
> > > > >
> > > > >
> > > > > I'm not sure if it makes sense to support reactive mode in a
> session
> > > > > cluster ever. For an autoscaling mode, it probably makes sense (as
> we
> > > can
> > > > > just combine the resource requests from all running jobs, and
> > > distribute
> > > > > the available resources proportional to the requested resources).
> > > > >
> > > > > I will state more clearly in the FLIP that the configuration
> options
> > > > should
> > > > > be marked as experimental.
> > > > >
> > > > > Active resource managers
> > > > >
> > > > > [...]
> > > > >
> > > > > If this is the only concern, I'd like to bring the configuration
> > option
> > > > > > `slotmanager.number-of-slots.max` to your attention.
> > > > >
> > > > >
> > > > > I understand and agree that it would be really nice to support
> active
> > > > > resource managers with the new scheduler right away. In my opinion,
> > > > > reactive mode will never be really supported by active resource
> > > managers,
> > > > > as this is a contradiction with the idea of reactive mode: It is
> > > > explicitly
> > > > > designed to allow controlling the cluster from the outside (similar
> > to
> > > > > Kafka streams, where you add and remove capacity for scaling).
> > > > Integration
> > > > > with active resource managers should be added in a autoscaling
> mode,
> > > > based
> > > > > on the declarative scheduler.
> > > > > I've considered the slotmanager.number-of-slots.max option as well,
> > but
> > > > it
> > > > > basically means that your cluster will always immediately scale up
> > > > > to slotmanager.number-of-slots.max and stick to that value, even if
> > > those
> > > > > resources are not needed.
> > > > > On YARN, it would be pretty difficult or even impossible to control
> > the
> > > > > scale of such a Flink deployment from the outside (using a queue
> with
> > > the
> > > > > capacity scheduler won't work, as changes to queues require
> restarts)
> > > > > On K8s, one would have to build a custom tool that finds the
> > deployment
> > > > > created by Flink and adjusts it. Then, it's probably easier to just
> > > > create
> > > > > a standalone deployment on K8s.
> > > > >
> > > > > @Yang:
> > > > >
> > > > > It will be better to make the 10 seconds to be configurable.
> > > > >
> > > > >
> > > > > I agree that it is pretty bold to have such an important
> > configuration
> > > > > parameter hardcoded. We proposed it like this to keep the first
> > > > > implementation as simple as possible.
> > > > > But if we see that basically everybody is asking for this, or if we
> > > have
> > > > > time left at the end of the release cycle, we'll make it
> > configurable.
> > > > >
> > > > >
> > > > > but also the ScalingPolicy is not exposed to the users now
> > > > >
> > > > >
> > > > > Exposing the ScalingPolicy to the user is very high on our priority
> > > list,
> > > > > but we want to keep the first version as simple as possible, to be
> > able
> > > > to
> > > > > deliver the overall feature in time, and to collect some initial
> user
> > > > > feedback before coming up with an interface we want to expose.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks Robert for creating this FLIP and starting the discussion.
> > > > > >
> > > > > > This is a great start point to make Flink work with auto scaling
> > > > service.
> > > > > > The reactive mode
> > > > > > is very useful in containerized environment(e.g. docker,
> > Kubernetes).
> > > > For
> > > > > > example, combined
> > > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers
> > > could
> > > > be
> > > > > > started/released
> > > > > > dynamically based on the system metrics(e.g. cpu, memory) and
> > custom
> > > > > > metrics(e.g. delay, latency).
> > > > > >
> > > > > >
> > > > > > > Once the job has started running, and a TaskManager is lost, it
> > > will
> > > > > wait
> > > > > > > for 10 seconds for the
> > > > > >
> > > > > > TaskManager to re-appear.
> > > > > >
> > > > > > It will be better to make the 10 seconds to be configurable.
> > > According
> > > > to
> > > > > > our production experience
> > > > > > on Kubernetes, 10 seconds is not enough for a pod to be
> relaunched.
> > > > Maybe
> > > > > > this is also a specific
> > > > > > case whether the resource is stable or not.
> > > > > >
> > > > > > > Active ResourceManager
> > > > > >
> > > > > > IIUC, the reason why reactive mode could not work with active
> > > resource
> > > > > > manager is not only
> > > > > > about requesting infinite amount of resources, but also the
> > > > ScalingPolicy
> > > > > > is not exposed to the
> > > > > > users now. ScalingPolicy could be the bridge between reactive
> mode
> > > and
> > > > > > active resource manager.
> > > > > > User could have their own auto scaling service, which monitor the
> > > Flink
> > > > > > metrics and then update
> > > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> > > resource
> > > > > > manager could allocate these
> > > > > > TaskManagers.
> > > > > > But it is out the scope of this FLIP, I really expect this could
> be
> > > > done
> > > > > in
> > > > > > the future. And it will be another
> > > > > > great step to make Flink auto scalable.
> > > > > >
> > > > > >
> > > > > >
> > > > > > [1].
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yang
> > > > > >
> > > > > >
> > > > > > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > > > >
> > > > > > > Thanks for preparing the FLIP and starting the discussion,
> > Robert.
> > > > > > >
> > > > > > > ## Cluster vs. Job configuration
> > > > > > > As I have commented on the FLIP-160 discussion thread [1], I'm
> a
> > > bit
> > > > > > unsure
> > > > > > > about activating the reactive execution mode via a cluster
> level
> > > > > > > configuration option. I'm aware that in the first step this
> > feature
> > > > > does
> > > > > > > not support session clusters. However, I think that does not
> mean
> > > it
> > > > > > won't
> > > > > > > be supported in future. In that case, the cluster configuration
> > > > option
> > > > > > will
> > > > > > > limit us from having jobs running with different execution
> modes
> > in
> > > > the
> > > > > > > same session cluster.
> > > > > > >
> > > > > > > ## Active resource managers
> > > > > > > According to the FLIP, this feature explicitly does not support
> > > > active
> > > > > > > resource managers. IIUC, this is because when in this feature
> the
> > > job
> > > > > > > requests an infinite amount of resources, which would flood
> > > > Kubernetes
> > > > > /
> > > > > > > Yarn / Mesos with unreasonably large number of resource
> requests.
> > > If
> > > > > this
> > > > > > > is the only concern, I'd like to bring the configuration option
> > > > > > > `slotmanager.number-of-slots.max` to your attention. This
> feature
> > > > > allows
> > > > > > > putting an upper limit to the total number of slots the Flink
> > > cluster
> > > > > > uses,
> > > > > > > preventing active resource managers from allocating too many
> > > > resources
> > > > > > from
> > > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns
> that I
> > > > > > > overlooked, I think it would be nicer for the reactive mode to
> > also
> > > > > > support
> > > > > > > active resource managers, with the additional requirement to
> > > > explicitly
> > > > > > > configure the max slots.
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > > > >
> > > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <
> stevenz3wu@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > > > >
> > > > > > > > > No fixed parallelism for any of the operators
> > > > > > > >
> > > > > > > > Regarding this limitation, can the scheduler only adjust the
> > > > default
> > > > > > > > parallelism? if some operators set parallelism explicitly
> (like
> > > > > always
> > > > > > > 1),
> > > > > > > > just leave them unchanged.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > > > rmetzger@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > Till started a discussion about FLIP-160: Declarative
> > scheduler
> > > > [1]
> > > > > > > > earlier
> > > > > > > > > today, the first major feature based on that effort will be
> > > > > FLIP-159:
> > > > > > > > > Reactive Mode. It allows users to operate Flink in a way
> that
> > > it
> > > > > > > > reactively
> > > > > > > > > scales the job up or down depending on the provided
> > resources:
> > > > > adding
> > > > > > > > > TaskManagers will scale the job up, removing them will
> scale
> > it
> > > > > down
> > > > > > > > again.
> > > > > > > > >
> > > > > > > > > Here's the link to the Wiki:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > > >
> > > > > > > > > We are very excited to hear your feedback about the
> proposal!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Robert
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Till Rohrmann <tr...@apache.org>.
Thanks a lot for all the feedback Steven, Yang Wang and Xintong. I have a
few more comments to add.

# Keep it simple and stupid

As Robert said we would like to keep the new feature initially as simple as
possible in order to quickly implement it. Once we have a basic
implementation, we want to reach out to our users to try it out and give us
feedback on the future development direction. We hope to create a better
feature involving our users as early as possible. That's why the proposal
is quite barebone.

Yang Wang's proposal to make the timeout value configurable sounds very
simple to me and might already improve usability big time w/o causing a lot
of implementation effort. Hence, I think it would be a good idea to include
this feature into the initial design.

# Cluster vs. job configuration

I am not entirely sure whether the execution mode is a job level
configuration. I think about it more like a deployment/execution/run option
because one and the same job A can be executed with a fixed parallelism on
a session cluster or using the reactive mode. Unfortunately, we don't have
this kind of distinction at the moment. Consequently, the idea was to first
introduce a cluster level configuration which might be ignored or cause a
fatal error when being used with the wrong deployment.

# Active resource managers

As Robert explained, the reactive mode is not designed for active resource
managers. However, by activating only the declarative scheduler for an
active deployment, we should be able to run a job (streaming jobs only for
the time being) even if the active RM could not allocate all the required
resources as you've described Xintong.

# Auto-scaling

Auto-scaling, which allows Flink jobs to control its resources, will be a
super helpful feature. I believe that we can build auto-scaling using the
declarative scheduler similar to how the reactive mode uses it. The main
difference between auto-scaling and the reactive mode is that the Flink
job needs to decide on the desired number of slots. In the reactive mode we
say that the desired value is "infinity" whereas for an auto-scaled job,
the job is able to dynamically adjust the desired value. However,
auto-scaling will not be part of this FLIP.

Cheers,
Till

On Tue, Jan 26, 2021 at 10:36 AM Xintong Song <to...@gmail.com> wrote:

> Thanks for the explanation, Robert.
>
> Now I see how these things are expected to be supported in steps.
>
> I think you are right. Demanding a fixed finite amount of resources can be
> considered as a special case of `ScalingPolicy`. I'm now good with the
> current scope of reactive mode as a first step, and support active resource
> managers with autoscaling mode after stabilizing FLIP-159/160.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rm...@apache.org>
> wrote:
>
> > Thanks for your thoughts Xintong! What you are writing is very valuable
> > feedback for me, as I have limited experience with real-world
> deployments.
> > It seems that autoscaling support is a really important follow up.
> >
> > ## active resource managers
> >
> > I guess you can consider reactive mode a special case of the more generic
> > autoscaling mode. Once we extend the interfaces in the declarative
> > scheduler to allow autoscaling mode, the scenarios you are describing are
> > possible.
> > We already had some ideas for some extended interfaces that would cover a
> > great variety of cases. We could allow the policy to determine the number
> > of desired slots, and propose a parallelism assignment based on that to
> the
> > policy. This would also work with making calls to external services
> > to decide the scale etc.
> > However implementing FLIP-159 and FLIP-160 might take quite a bit of time
> > to stabilize all the corner cases. Once that is done, we'll publish a
> FLIP
> > with an advanced interface for autoscaling.
> >
> > On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <to...@gmail.com>
> > wrote:
> >
> > > ## configuration option
> > >
> > > I see your point that autoscaling mode might be more suitable for
> session
> > > clusters. It doesn't change that `execution-mode` could be a job-level
> > > configuration. But I'm good with keeping it cluster-level and marking
> it
> > > experimental at the moment, so we can change it later if needed for the
> > > autoscaling mode.
> > >
> > > ## active resource managers
> > >
> > > I'm a bit confused about the boundary between reactive mode and
> > autoscaling
> > > mode.
> > > - Reactive mode requests an infinite amount of resources, and executes
> at
> > > the largest parallelism that is possible with the available resources.
> > > - Autoscaling mode dynamically adjusts resource demand, and executes
> at a
> > > parallelism that is either demanded or as large as possible if the
> > > demanded parallelism cannot be reached.
> > > - What about something in between? A job is not capable of dynamically
> > > adjusting the resource demand and requests a fixed finite amount of
> > > resources, and still wants to be executed with as large parallelisms as
> > > possible if the demanded parallelism cannot be reached?
> > >
> > > It's quite common that a job may temporarily not get as much resources
> as
> > > desired, due to running of other higher priority jobs in the
> > > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user
> > > needs to configure the job with a different parallelism, or the job
> > cannot
> > > be executed. It would be helpful if the job can execute with a lower
> > > parallelism, and automatically scales up to the original desired
> > > parallelism when more resources become available.
> > >
> > >
> > > For Yarn, there's comprehensive queue based resource quota management,
> > > where how many resources each job gets are closely related to other
> jobs'
> > > resource requirements. For Kubernetes, while the default kube-scheduler
> > > does not have such mature multi-tenant support, there are other
> projects
> > > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > > capability to Kubernetes
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1] https://yunikorn.apache.org/
> > >
> > > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rm...@apache.org>
> > > wrote:
> > >
> > > > Thank you very much for the comments so far.
> > > >
> > > > @Steven:
> > > >
> > > > No fixed parallelism for any of the operators
> > > > >
> > > > > Regarding this limitation, can the scheduler only adjust the
> default
> > > > > parallelism? if some operators set parallelism explicitly (like
> > always
> > > > 1),
> > > > > just leave them unchanged.
> > > >
> > > >
> > > > We will respect the configured maxParallelism for that purpose. If
> you
> > > have
> > > > an operator that is not intended to run in parallel, you can set
> > > > maxParalellism=1.
> > > >
> > > > @Xintong:
> > > >
> > > > the cluster configuration option will limit us from having jobs
> running
> > > > > with different execution modes in the same session cluster.
> > > >
> > > >
> > > > I'm not sure if it makes sense to support reactive mode in a session
> > > > cluster ever. For an autoscaling mode, it probably makes sense (as we
> > can
> > > > just combine the resource requests from all running jobs, and
> > distribute
> > > > the available resources proportional to the requested resources).
> > > >
> > > > I will state more clearly in the FLIP that the configuration options
> > > should
> > > > be marked as experimental.
> > > >
> > > > Active resource managers
> > > >
> > > > [...]
> > > >
> > > > If this is the only concern, I'd like to bring the configuration
> option
> > > > > `slotmanager.number-of-slots.max` to your attention.
> > > >
> > > >
> > > > I understand and agree that it would be really nice to support active
> > > > resource managers with the new scheduler right away. In my opinion,
> > > > reactive mode will never be really supported by active resource
> > managers,
> > > > as this is a contradiction with the idea of reactive mode: It is
> > > explicitly
> > > > designed to allow controlling the cluster from the outside (similar
> to
> > > > Kafka streams, where you add and remove capacity for scaling).
> > > Integration
> > > > with active resource managers should be added in a autoscaling mode,
> > > based
> > > > on the declarative scheduler.
> > > > I've considered the slotmanager.number-of-slots.max option as well,
> but
> > > it
> > > > basically means that your cluster will always immediately scale up
> > > > to slotmanager.number-of-slots.max and stick to that value, even if
> > those
> > > > resources are not needed.
> > > > On YARN, it would be pretty difficult or even impossible to control
> the
> > > > scale of such a Flink deployment from the outside (using a queue with
> > the
> > > > capacity scheduler won't work, as changes to queues require restarts)
> > > > On K8s, one would have to build a custom tool that finds the
> deployment
> > > > created by Flink and adjusts it. Then, it's probably easier to just
> > > create
> > > > a standalone deployment on K8s.
> > > >
> > > > @Yang:
> > > >
> > > > It will be better to make the 10 seconds to be configurable.
> > > >
> > > >
> > > > I agree that it is pretty bold to have such an important
> configuration
> > > > parameter hardcoded. We proposed it like this to keep the first
> > > > implementation as simple as possible.
> > > > But if we see that basically everybody is asking for this, or if we
> > have
> > > > time left at the end of the release cycle, we'll make it
> configurable.
> > > >
> > > >
> > > > but also the ScalingPolicy is not exposed to the users now
> > > >
> > > >
> > > > Exposing the ScalingPolicy to the user is very high on our priority
> > list,
> > > > but we want to keep the first version as simple as possible, to be
> able
> > > to
> > > > deliver the overall feature in time, and to collect some initial user
> > > > feedback before coming up with an interface we want to expose.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Robert for creating this FLIP and starting the discussion.
> > > > >
> > > > > This is a great start point to make Flink work with auto scaling
> > > service.
> > > > > The reactive mode
> > > > > is very useful in containerized environment(e.g. docker,
> Kubernetes).
> > > For
> > > > > example, combined
> > > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers
> > could
> > > be
> > > > > started/released
> > > > > dynamically based on the system metrics(e.g. cpu, memory) and
> custom
> > > > > metrics(e.g. delay, latency).
> > > > >
> > > > >
> > > > > > Once the job has started running, and a TaskManager is lost, it
> > will
> > > > wait
> > > > > > for 10 seconds for the
> > > > >
> > > > > TaskManager to re-appear.
> > > > >
> > > > > It will be better to make the 10 seconds to be configurable.
> > According
> > > to
> > > > > our production experience
> > > > > on Kubernetes, 10 seconds is not enough for a pod to be relaunched.
> > > Maybe
> > > > > this is also a specific
> > > > > case whether the resource is stable or not.
> > > > >
> > > > > > Active ResourceManager
> > > > >
> > > > > IIUC, the reason why reactive mode could not work with active
> > resource
> > > > > manager is not only
> > > > > about requesting infinite amount of resources, but also the
> > > ScalingPolicy
> > > > > is not exposed to the
> > > > > users now. ScalingPolicy could be the bridge between reactive mode
> > and
> > > > > active resource manager.
> > > > > User could have their own auto scaling service, which monitor the
> > Flink
> > > > > metrics and then update
> > > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> > resource
> > > > > manager could allocate these
> > > > > TaskManagers.
> > > > > But it is out the scope of this FLIP, I really expect this could be
> > > done
> > > > in
> > > > > the future. And it will be another
> > > > > great step to make Flink auto scalable.
> > > > >
> > > > >
> > > > >
> > > > > [1].
> > > > >
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > >
> > > > > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > > >
> > > > > > Thanks for preparing the FLIP and starting the discussion,
> Robert.
> > > > > >
> > > > > > ## Cluster vs. Job configuration
> > > > > > As I have commented on the FLIP-160 discussion thread [1], I'm a
> > bit
> > > > > unsure
> > > > > > about activating the reactive execution mode via a cluster level
> > > > > > configuration option. I'm aware that in the first step this
> feature
> > > > does
> > > > > > not support session clusters. However, I think that does not mean
> > it
> > > > > won't
> > > > > > be supported in future. In that case, the cluster configuration
> > > option
> > > > > will
> > > > > > limit us from having jobs running with different execution modes
> in
> > > the
> > > > > > same session cluster.
> > > > > >
> > > > > > ## Active resource managers
> > > > > > According to the FLIP, this feature explicitly does not support
> > > active
> > > > > > resource managers. IIUC, this is because when in this feature the
> > job
> > > > > > requests an infinite amount of resources, which would flood
> > > Kubernetes
> > > > /
> > > > > > Yarn / Mesos with unreasonably large number of resource requests.
> > If
> > > > this
> > > > > > is the only concern, I'd like to bring the configuration option
> > > > > > `slotmanager.number-of-slots.max` to your attention. This feature
> > > > allows
> > > > > > putting an upper limit to the total number of slots the Flink
> > cluster
> > > > > uses,
> > > > > > preventing active resource managers from allocating too many
> > > resources
> > > > > from
> > > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > > > > > overlooked, I think it would be nicer for the reactive mode to
> also
> > > > > support
> > > > > > active resource managers, with the additional requirement to
> > > explicitly
> > > > > > configure the max slots.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > > >
> > > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > > >
> > > > > > > > No fixed parallelism for any of the operators
> > > > > > >
> > > > > > > Regarding this limitation, can the scheduler only adjust the
> > > default
> > > > > > > parallelism? if some operators set parallelism explicitly (like
> > > > always
> > > > > > 1),
> > > > > > > just leave them unchanged.
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > > rmetzger@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > Till started a discussion about FLIP-160: Declarative
> scheduler
> > > [1]
> > > > > > > earlier
> > > > > > > > today, the first major feature based on that effort will be
> > > > FLIP-159:
> > > > > > > > Reactive Mode. It allows users to operate Flink in a way that
> > it
> > > > > > > reactively
> > > > > > > > scales the job up or down depending on the provided
> resources:
> > > > adding
> > > > > > > > TaskManagers will scale the job up, removing them will scale
> it
> > > > down
> > > > > > > again.
> > > > > > > >
> > > > > > > > Here's the link to the Wiki:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > > >
> > > > > > > > We are very excited to hear your feedback about the proposal!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Robert
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Xintong Song <to...@gmail.com>.
Thanks for the explanation, Robert.

Now I see how these things are expected to be supported in steps.

I think you are right. Demanding a fixed finite amount of resources can be
considered as a special case of `ScalingPolicy`. I'm now good with the
current scope of reactive mode as a first step, and support active resource
managers with autoscaling mode after stabilizing FLIP-159/160.

Thank you~

Xintong Song



On Tue, Jan 26, 2021 at 5:00 PM Robert Metzger <rm...@apache.org> wrote:

> Thanks for your thoughts Xintong! What you are writing is very valuable
> feedback for me, as I have limited experience with real-world deployments.
> It seems that autoscaling support is a really important follow up.
>
> ## active resource managers
>
> I guess you can consider reactive mode a special case of the more generic
> autoscaling mode. Once we extend the interfaces in the declarative
> scheduler to allow autoscaling mode, the scenarios you are describing are
> possible.
> We already had some ideas for some extended interfaces that would cover a
> great variety of cases. We could allow the policy to determine the number
> of desired slots, and propose a parallelism assignment based on that to the
> policy. This would also work with making calls to external services
> to decide the scale etc.
> However implementing FLIP-159 and FLIP-160 might take quite a bit of time
> to stabilize all the corner cases. Once that is done, we'll publish a FLIP
> with an advanced interface for autoscaling.
>
> On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <to...@gmail.com>
> wrote:
>
> > ## configuration option
> >
> > I see your point that autoscaling mode might be more suitable for session
> > clusters. It doesn't change that `execution-mode` could be a job-level
> > configuration. But I'm good with keeping it cluster-level and marking it
> > experimental at the moment, so we can change it later if needed for the
> > autoscaling mode.
> >
> > ## active resource managers
> >
> > I'm a bit confused about the boundary between reactive mode and
> autoscaling
> > mode.
> > - Reactive mode requests an infinite amount of resources, and executes at
> > the largest parallelism that is possible with the available resources.
> > - Autoscaling mode dynamically adjusts resource demand, and executes at a
> > parallelism that is either demanded or as large as possible if the
> > demanded parallelism cannot be reached.
> > - What about something in between? A job is not capable of dynamically
> > adjusting the resource demand and requests a fixed finite amount of
> > resources, and still wants to be executed with as large parallelisms as
> > possible if the demanded parallelism cannot be reached?
> >
> > It's quite common that a job may temporarily not get as much resources as
> > desired, due to running of other higher priority jobs in the
> > Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user
> > needs to configure the job with a different parallelism, or the job
> cannot
> > be executed. It would be helpful if the job can execute with a lower
> > parallelism, and automatically scales up to the original desired
> > parallelism when more resources become available.
> >
> >
> > For Yarn, there's comprehensive queue based resource quota management,
> > where how many resources each job gets are closely related to other jobs'
> > resource requirements. For Kubernetes, while the default kube-scheduler
> > does not have such mature multi-tenant support, there are other projects
> > (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> > capability to Kubernetes
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1] https://yunikorn.apache.org/
> >
> > On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rm...@apache.org>
> > wrote:
> >
> > > Thank you very much for the comments so far.
> > >
> > > @Steven:
> > >
> > > No fixed parallelism for any of the operators
> > > >
> > > > Regarding this limitation, can the scheduler only adjust the default
> > > > parallelism? if some operators set parallelism explicitly (like
> always
> > > 1),
> > > > just leave them unchanged.
> > >
> > >
> > > We will respect the configured maxParallelism for that purpose. If you
> > have
> > > an operator that is not intended to run in parallel, you can set
> > > maxParalellism=1.
> > >
> > > @Xintong:
> > >
> > > the cluster configuration option will limit us from having jobs running
> > > > with different execution modes in the same session cluster.
> > >
> > >
> > > I'm not sure if it makes sense to support reactive mode in a session
> > > cluster ever. For an autoscaling mode, it probably makes sense (as we
> can
> > > just combine the resource requests from all running jobs, and
> distribute
> > > the available resources proportional to the requested resources).
> > >
> > > I will state more clearly in the FLIP that the configuration options
> > should
> > > be marked as experimental.
> > >
> > > Active resource managers
> > >
> > > [...]
> > >
> > > If this is the only concern, I'd like to bring the configuration option
> > > > `slotmanager.number-of-slots.max` to your attention.
> > >
> > >
> > > I understand and agree that it would be really nice to support active
> > > resource managers with the new scheduler right away. In my opinion,
> > > reactive mode will never be really supported by active resource
> managers,
> > > as this is a contradiction with the idea of reactive mode: It is
> > explicitly
> > > designed to allow controlling the cluster from the outside (similar to
> > > Kafka streams, where you add and remove capacity for scaling).
> > Integration
> > > with active resource managers should be added in a autoscaling mode,
> > based
> > > on the declarative scheduler.
> > > I've considered the slotmanager.number-of-slots.max option as well, but
> > it
> > > basically means that your cluster will always immediately scale up
> > > to slotmanager.number-of-slots.max and stick to that value, even if
> those
> > > resources are not needed.
> > > On YARN, it would be pretty difficult or even impossible to control the
> > > scale of such a Flink deployment from the outside (using a queue with
> the
> > > capacity scheduler won't work, as changes to queues require restarts)
> > > On K8s, one would have to build a custom tool that finds the deployment
> > > created by Flink and adjusts it. Then, it's probably easier to just
> > create
> > > a standalone deployment on K8s.
> > >
> > > @Yang:
> > >
> > > It will be better to make the 10 seconds to be configurable.
> > >
> > >
> > > I agree that it is pretty bold to have such an important configuration
> > > parameter hardcoded. We proposed it like this to keep the first
> > > implementation as simple as possible.
> > > But if we see that basically everybody is asking for this, or if we
> have
> > > time left at the end of the release cycle, we'll make it configurable.
> > >
> > >
> > > but also the ScalingPolicy is not exposed to the users now
> > >
> > >
> > > Exposing the ScalingPolicy to the user is very high on our priority
> list,
> > > but we want to keep the first version as simple as possible, to be able
> > to
> > > deliver the overall feature in time, and to collect some initial user
> > > feedback before coming up with an interface we want to expose.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com>
> wrote:
> > >
> > > > Thanks Robert for creating this FLIP and starting the discussion.
> > > >
> > > > This is a great start point to make Flink work with auto scaling
> > service.
> > > > The reactive mode
> > > > is very useful in containerized environment(e.g. docker, Kubernetes).
> > For
> > > > example, combined
> > > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers
> could
> > be
> > > > started/released
> > > > dynamically based on the system metrics(e.g. cpu, memory) and custom
> > > > metrics(e.g. delay, latency).
> > > >
> > > >
> > > > > Once the job has started running, and a TaskManager is lost, it
> will
> > > wait
> > > > > for 10 seconds for the
> > > >
> > > > TaskManager to re-appear.
> > > >
> > > > It will be better to make the 10 seconds to be configurable.
> According
> > to
> > > > our production experience
> > > > on Kubernetes, 10 seconds is not enough for a pod to be relaunched.
> > Maybe
> > > > this is also a specific
> > > > case whether the resource is stable or not.
> > > >
> > > > > Active ResourceManager
> > > >
> > > > IIUC, the reason why reactive mode could not work with active
> resource
> > > > manager is not only
> > > > about requesting infinite amount of resources, but also the
> > ScalingPolicy
> > > > is not exposed to the
> > > > users now. ScalingPolicy could be the bridge between reactive mode
> and
> > > > active resource manager.
> > > > User could have their own auto scaling service, which monitor the
> Flink
> > > > metrics and then update
> > > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active
> resource
> > > > manager could allocate these
> > > > TaskManagers.
> > > > But it is out the scope of this FLIP, I really expect this could be
> > done
> > > in
> > > > the future. And it will be another
> > > > great step to make Flink auto scalable.
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > >
> > > > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > > >
> > > > > Thanks for preparing the FLIP and starting the discussion, Robert.
> > > > >
> > > > > ## Cluster vs. Job configuration
> > > > > As I have commented on the FLIP-160 discussion thread [1], I'm a
> bit
> > > > unsure
> > > > > about activating the reactive execution mode via a cluster level
> > > > > configuration option. I'm aware that in the first step this feature
> > > does
> > > > > not support session clusters. However, I think that does not mean
> it
> > > > won't
> > > > > be supported in future. In that case, the cluster configuration
> > option
> > > > will
> > > > > limit us from having jobs running with different execution modes in
> > the
> > > > > same session cluster.
> > > > >
> > > > > ## Active resource managers
> > > > > According to the FLIP, this feature explicitly does not support
> > active
> > > > > resource managers. IIUC, this is because when in this feature the
> job
> > > > > requests an infinite amount of resources, which would flood
> > Kubernetes
> > > /
> > > > > Yarn / Mesos with unreasonably large number of resource requests.
> If
> > > this
> > > > > is the only concern, I'd like to bring the configuration option
> > > > > `slotmanager.number-of-slots.max` to your attention. This feature
> > > allows
> > > > > putting an upper limit to the total number of slots the Flink
> cluster
> > > > uses,
> > > > > preventing active resource managers from allocating too many
> > resources
> > > > from
> > > > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > > > > overlooked, I think it would be nicer for the reactive mode to also
> > > > support
> > > > > active resource managers, with the additional requirement to
> > explicitly
> > > > > configure the max slots.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > > >
> > > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks a lot for the proposal, Robert and Till.
> > > > > >
> > > > > > > No fixed parallelism for any of the operators
> > > > > >
> > > > > > Regarding this limitation, can the scheduler only adjust the
> > default
> > > > > > parallelism? if some operators set parallelism explicitly (like
> > > always
> > > > > 1),
> > > > > > just leave them unchanged.
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> > rmetzger@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Till started a discussion about FLIP-160: Declarative scheduler
> > [1]
> > > > > > earlier
> > > > > > > today, the first major feature based on that effort will be
> > > FLIP-159:
> > > > > > > Reactive Mode. It allows users to operate Flink in a way that
> it
> > > > > > reactively
> > > > > > > scales the job up or down depending on the provided resources:
> > > adding
> > > > > > > TaskManagers will scale the job up, removing them will scale it
> > > down
> > > > > > again.
> > > > > > >
> > > > > > > Here's the link to the Wiki:
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > > >
> > > > > > > We are very excited to hear your feedback about the proposal!
> > > > > > >
> > > > > > > Best,
> > > > > > > Robert
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Robert Metzger <rm...@apache.org>.
Thanks for your thoughts Xintong! What you are writing is very valuable
feedback for me, as I have limited experience with real-world deployments.
It seems that autoscaling support is a really important follow up.

## active resource managers

I guess you can consider reactive mode a special case of the more generic
autoscaling mode. Once we extend the interfaces in the declarative
scheduler to allow autoscaling mode, the scenarios you are describing are
possible.
We already had some ideas for some extended interfaces that would cover a
great variety of cases. We could allow the policy to determine the number
of desired slots, and propose a parallelism assignment based on that to the
policy. This would also work with making calls to external services
to decide the scale etc.
However implementing FLIP-159 and FLIP-160 might take quite a bit of time
to stabilize all the corner cases. Once that is done, we'll publish a FLIP
with an advanced interface for autoscaling.

On Tue, Jan 26, 2021 at 2:56 AM Xintong Song <to...@gmail.com> wrote:

> ## configuration option
>
> I see your point that autoscaling mode might be more suitable for session
> clusters. It doesn't change that `execution-mode` could be a job-level
> configuration. But I'm good with keeping it cluster-level and marking it
> experimental at the moment, so we can change it later if needed for the
> autoscaling mode.
>
> ## active resource managers
>
> I'm a bit confused about the boundary between reactive mode and autoscaling
> mode.
> - Reactive mode requests an infinite amount of resources, and executes at
> the largest parallelism that is possible with the available resources.
> - Autoscaling mode dynamically adjusts resource demand, and executes at a
> parallelism that is either demanded or as large as possible if the
> demanded parallelism cannot be reached.
> - What about something in between? A job is not capable of dynamically
> adjusting the resource demand and requests a fixed finite amount of
> resources, and still wants to be executed with as large parallelisms as
> possible if the demanded parallelism cannot be reached?
>
> It's quite common that a job may temporarily not get as much resources as
> desired, due to running of other higher priority jobs in the
> Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user
> needs to configure the job with a different parallelism, or the job cannot
> be executed. It would be helpful if the job can execute with a lower
> parallelism, and automatically scales up to the original desired
> parallelism when more resources become available.
>
>
> For Yarn, there's comprehensive queue based resource quota management,
> where how many resources each job gets are closely related to other jobs'
> resource requirements. For Kubernetes, while the default kube-scheduler
> does not have such mature multi-tenant support, there are other projects
> (e.g., Apache YuniKorn [1]) that can bring the similar scheduling
> capability to Kubernetes
>
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://yunikorn.apache.org/
>
> On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rm...@apache.org>
> wrote:
>
> > Thank you very much for the comments so far.
> >
> > @Steven:
> >
> > No fixed parallelism for any of the operators
> > >
> > > Regarding this limitation, can the scheduler only adjust the default
> > > parallelism? if some operators set parallelism explicitly (like always
> > 1),
> > > just leave them unchanged.
> >
> >
> > We will respect the configured maxParallelism for that purpose. If you
> have
> > an operator that is not intended to run in parallel, you can set
> > maxParalellism=1.
> >
> > @Xintong:
> >
> > the cluster configuration option will limit us from having jobs running
> > > with different execution modes in the same session cluster.
> >
> >
> > I'm not sure if it makes sense to support reactive mode in a session
> > cluster ever. For an autoscaling mode, it probably makes sense (as we can
> > just combine the resource requests from all running jobs, and distribute
> > the available resources proportional to the requested resources).
> >
> > I will state more clearly in the FLIP that the configuration options
> should
> > be marked as experimental.
> >
> > Active resource managers
> >
> > [...]
> >
> > If this is the only concern, I'd like to bring the configuration option
> > > `slotmanager.number-of-slots.max` to your attention.
> >
> >
> > I understand and agree that it would be really nice to support active
> > resource managers with the new scheduler right away. In my opinion,
> > reactive mode will never be really supported by active resource managers,
> > as this is a contradiction with the idea of reactive mode: It is
> explicitly
> > designed to allow controlling the cluster from the outside (similar to
> > Kafka streams, where you add and remove capacity for scaling).
> Integration
> > with active resource managers should be added in a autoscaling mode,
> based
> > on the declarative scheduler.
> > I've considered the slotmanager.number-of-slots.max option as well, but
> it
> > basically means that your cluster will always immediately scale up
> > to slotmanager.number-of-slots.max and stick to that value, even if those
> > resources are not needed.
> > On YARN, it would be pretty difficult or even impossible to control the
> > scale of such a Flink deployment from the outside (using a queue with the
> > capacity scheduler won't work, as changes to queues require restarts)
> > On K8s, one would have to build a custom tool that finds the deployment
> > created by Flink and adjusts it. Then, it's probably easier to just
> create
> > a standalone deployment on K8s.
> >
> > @Yang:
> >
> > It will be better to make the 10 seconds to be configurable.
> >
> >
> > I agree that it is pretty bold to have such an important configuration
> > parameter hardcoded. We proposed it like this to keep the first
> > implementation as simple as possible.
> > But if we see that basically everybody is asking for this, or if we have
> > time left at the end of the release cycle, we'll make it configurable.
> >
> >
> > but also the ScalingPolicy is not exposed to the users now
> >
> >
> > Exposing the ScalingPolicy to the user is very high on our priority list,
> > but we want to keep the first version as simple as possible, to be able
> to
> > deliver the overall feature in time, and to collect some initial user
> > feedback before coming up with an interface we want to expose.
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com> wrote:
> >
> > > Thanks Robert for creating this FLIP and starting the discussion.
> > >
> > > This is a great start point to make Flink work with auto scaling
> service.
> > > The reactive mode
> > > is very useful in containerized environment(e.g. docker, Kubernetes).
> For
> > > example, combined
> > > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers could
> be
> > > started/released
> > > dynamically based on the system metrics(e.g. cpu, memory) and custom
> > > metrics(e.g. delay, latency).
> > >
> > >
> > > > Once the job has started running, and a TaskManager is lost, it will
> > wait
> > > > for 10 seconds for the
> > >
> > > TaskManager to re-appear.
> > >
> > > It will be better to make the 10 seconds to be configurable. According
> to
> > > our production experience
> > > on Kubernetes, 10 seconds is not enough for a pod to be relaunched.
> Maybe
> > > this is also a specific
> > > case whether the resource is stable or not.
> > >
> > > > Active ResourceManager
> > >
> > > IIUC, the reason why reactive mode could not work with active resource
> > > manager is not only
> > > about requesting infinite amount of resources, but also the
> ScalingPolicy
> > > is not exposed to the
> > > users now. ScalingPolicy could be the bridge between reactive mode and
> > > active resource manager.
> > > User could have their own auto scaling service, which monitor the Flink
> > > metrics and then update
> > > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active resource
> > > manager could allocate these
> > > TaskManagers.
> > > But it is out the scope of this FLIP, I really expect this could be
> done
> > in
> > > the future. And it will be another
> > > great step to make Flink auto scalable.
> > >
> > >
> > >
> > > [1].
> > >
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> > >
> > >
> > > Best,
> > > Yang
> > >
> > >
> > > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> > >
> > > > Thanks for preparing the FLIP and starting the discussion, Robert.
> > > >
> > > > ## Cluster vs. Job configuration
> > > > As I have commented on the FLIP-160 discussion thread [1], I'm a bit
> > > unsure
> > > > about activating the reactive execution mode via a cluster level
> > > > configuration option. I'm aware that in the first step this feature
> > does
> > > > not support session clusters. However, I think that does not mean it
> > > won't
> > > > be supported in future. In that case, the cluster configuration
> option
> > > will
> > > > limit us from having jobs running with different execution modes in
> the
> > > > same session cluster.
> > > >
> > > > ## Active resource managers
> > > > According to the FLIP, this feature explicitly does not support
> active
> > > > resource managers. IIUC, this is because when in this feature the job
> > > > requests an infinite amount of resources, which would flood
> Kubernetes
> > /
> > > > Yarn / Mesos with unreasonably large number of resource requests. If
> > this
> > > > is the only concern, I'd like to bring the configuration option
> > > > `slotmanager.number-of-slots.max` to your attention. This feature
> > allows
> > > > putting an upper limit to the total number of slots the Flink cluster
> > > uses,
> > > > preventing active resource managers from allocating too many
> resources
> > > from
> > > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > > > overlooked, I think it would be nicer for the reactive mode to also
> > > support
> > > > active resource managers, with the additional requirement to
> explicitly
> > > > configure the max slots.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > > >
> > > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks a lot for the proposal, Robert and Till.
> > > > >
> > > > > > No fixed parallelism for any of the operators
> > > > >
> > > > > Regarding this limitation, can the scheduler only adjust the
> default
> > > > > parallelism? if some operators set parallelism explicitly (like
> > always
> > > > 1),
> > > > > just leave them unchanged.
> > > > >
> > > > >
> > > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <
> rmetzger@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Till started a discussion about FLIP-160: Declarative scheduler
> [1]
> > > > > earlier
> > > > > > today, the first major feature based on that effort will be
> > FLIP-159:
> > > > > > Reactive Mode. It allows users to operate Flink in a way that it
> > > > > reactively
> > > > > > scales the job up or down depending on the provided resources:
> > adding
> > > > > > TaskManagers will scale the job up, removing them will scale it
> > down
> > > > > again.
> > > > > >
> > > > > > Here's the link to the Wiki:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > > >
> > > > > > We are very excited to hear your feedback about the proposal!
> > > > > >
> > > > > > Best,
> > > > > > Robert
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Xintong Song <to...@gmail.com>.
## configuration option

I see your point that autoscaling mode might be more suitable for session
clusters. It doesn't change that `execution-mode` could be a job-level
configuration. But I'm good with keeping it cluster-level and marking it
experimental at the moment, so we can change it later if needed for the
autoscaling mode.

## active resource managers

I'm a bit confused about the boundary between reactive mode and autoscaling
mode.
- Reactive mode requests an infinite amount of resources, and executes at
the largest parallelism that is possible with the available resources.
- Autoscaling mode dynamically adjusts resource demand, and executes at a
parallelism that is either demanded or as large as possible if the
demanded parallelism cannot be reached.
- What about something in between? A job is not capable of dynamically
adjusting the resource demand and requests a fixed finite amount of
resources, and still wants to be executed with as large parallelisms as
possible if the demanded parallelism cannot be reached?

It's quite common that a job may temporarily not get as much resources as
desired, due to running of other higher priority jobs in the
Kubernetes/Yarn/Mesos cluster. In such cases, currently either the user
needs to configure the job with a different parallelism, or the job cannot
be executed. It would be helpful if the job can execute with a lower
parallelism, and automatically scales up to the original desired
parallelism when more resources become available.


For Yarn, there's comprehensive queue based resource quota management,
where how many resources each job gets are closely related to other jobs'
resource requirements. For Kubernetes, while the default kube-scheduler
does not have such mature multi-tenant support, there are other projects
(e.g., Apache YuniKorn [1]) that can bring the similar scheduling
capability to Kubernetes


Thank you~

Xintong Song


[1] https://yunikorn.apache.org/

On Mon, Jan 25, 2021 at 4:48 PM Robert Metzger <rm...@apache.org> wrote:

> Thank you very much for the comments so far.
>
> @Steven:
>
> No fixed parallelism for any of the operators
> >
> > Regarding this limitation, can the scheduler only adjust the default
> > parallelism? if some operators set parallelism explicitly (like always
> 1),
> > just leave them unchanged.
>
>
> We will respect the configured maxParallelism for that purpose. If you have
> an operator that is not intended to run in parallel, you can set
> maxParalellism=1.
>
> @Xintong:
>
> the cluster configuration option will limit us from having jobs running
> > with different execution modes in the same session cluster.
>
>
> I'm not sure if it makes sense to support reactive mode in a session
> cluster ever. For an autoscaling mode, it probably makes sense (as we can
> just combine the resource requests from all running jobs, and distribute
> the available resources proportional to the requested resources).
>
> I will state more clearly in the FLIP that the configuration options should
> be marked as experimental.
>
> Active resource managers
>
> [...]
>
> If this is the only concern, I'd like to bring the configuration option
> > `slotmanager.number-of-slots.max` to your attention.
>
>
> I understand and agree that it would be really nice to support active
> resource managers with the new scheduler right away. In my opinion,
> reactive mode will never be really supported by active resource managers,
> as this is a contradiction with the idea of reactive mode: It is explicitly
> designed to allow controlling the cluster from the outside (similar to
> Kafka streams, where you add and remove capacity for scaling). Integration
> with active resource managers should be added in a autoscaling mode, based
> on the declarative scheduler.
> I've considered the slotmanager.number-of-slots.max option as well, but it
> basically means that your cluster will always immediately scale up
> to slotmanager.number-of-slots.max and stick to that value, even if those
> resources are not needed.
> On YARN, it would be pretty difficult or even impossible to control the
> scale of such a Flink deployment from the outside (using a queue with the
> capacity scheduler won't work, as changes to queues require restarts)
> On K8s, one would have to build a custom tool that finds the deployment
> created by Flink and adjusts it. Then, it's probably easier to just create
> a standalone deployment on K8s.
>
> @Yang:
>
> It will be better to make the 10 seconds to be configurable.
>
>
> I agree that it is pretty bold to have such an important configuration
> parameter hardcoded. We proposed it like this to keep the first
> implementation as simple as possible.
> But if we see that basically everybody is asking for this, or if we have
> time left at the end of the release cycle, we'll make it configurable.
>
>
> but also the ScalingPolicy is not exposed to the users now
>
>
> Exposing the ScalingPolicy to the user is very high on our priority list,
> but we want to keep the first version as simple as possible, to be able to
> deliver the overall feature in time, and to collect some initial user
> feedback before coming up with an interface we want to expose.
>
>
>
>
>
>
>
> On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com> wrote:
>
> > Thanks Robert for creating this FLIP and starting the discussion.
> >
> > This is a great start point to make Flink work with auto scaling service.
> > The reactive mode
> > is very useful in containerized environment(e.g. docker, Kubernetes). For
> > example, combined
> > with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers could be
> > started/released
> > dynamically based on the system metrics(e.g. cpu, memory) and custom
> > metrics(e.g. delay, latency).
> >
> >
> > > Once the job has started running, and a TaskManager is lost, it will
> wait
> > > for 10 seconds for the
> >
> > TaskManager to re-appear.
> >
> > It will be better to make the 10 seconds to be configurable. According to
> > our production experience
> > on Kubernetes, 10 seconds is not enough for a pod to be relaunched. Maybe
> > this is also a specific
> > case whether the resource is stable or not.
> >
> > > Active ResourceManager
> >
> > IIUC, the reason why reactive mode could not work with active resource
> > manager is not only
> > about requesting infinite amount of resources, but also the ScalingPolicy
> > is not exposed to the
> > users now. ScalingPolicy could be the bridge between reactive mode and
> > active resource manager.
> > User could have their own auto scaling service, which monitor the Flink
> > metrics and then update
> > the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active resource
> > manager could allocate these
> > TaskManagers.
> > But it is out the scope of this FLIP, I really expect this could be done
> in
> > the future. And it will be another
> > great step to make Flink auto scalable.
> >
> >
> >
> > [1].
> >
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
> >
> >
> > Best,
> > Yang
> >
> >
> > Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
> >
> > > Thanks for preparing the FLIP and starting the discussion, Robert.
> > >
> > > ## Cluster vs. Job configuration
> > > As I have commented on the FLIP-160 discussion thread [1], I'm a bit
> > unsure
> > > about activating the reactive execution mode via a cluster level
> > > configuration option. I'm aware that in the first step this feature
> does
> > > not support session clusters. However, I think that does not mean it
> > won't
> > > be supported in future. In that case, the cluster configuration option
> > will
> > > limit us from having jobs running with different execution modes in the
> > > same session cluster.
> > >
> > > ## Active resource managers
> > > According to the FLIP, this feature explicitly does not support active
> > > resource managers. IIUC, this is because when in this feature the job
> > > requests an infinite amount of resources, which would flood Kubernetes
> /
> > > Yarn / Mesos with unreasonably large number of resource requests. If
> this
> > > is the only concern, I'd like to bring the configuration option
> > > `slotmanager.number-of-slots.max` to your attention. This feature
> allows
> > > putting an upper limit to the total number of slots the Flink cluster
> > uses,
> > > preventing active resource managers from allocating too many resources
> > from
> > > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > > overlooked, I think it would be nicer for the reactive mode to also
> > support
> > > active resource managers, with the additional requirement to explicitly
> > > configure the max slots.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> > >
> > > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com>
> wrote:
> > >
> > > > Thanks a lot for the proposal, Robert and Till.
> > > >
> > > > > No fixed parallelism for any of the operators
> > > >
> > > > Regarding this limitation, can the scheduler only adjust the default
> > > > parallelism? if some operators set parallelism explicitly (like
> always
> > > 1),
> > > > just leave them unchanged.
> > > >
> > > >
> > > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rm...@apache.org>
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > Till started a discussion about FLIP-160: Declarative scheduler [1]
> > > > earlier
> > > > > today, the first major feature based on that effort will be
> FLIP-159:
> > > > > Reactive Mode. It allows users to operate Flink in a way that it
> > > > reactively
> > > > > scales the job up or down depending on the provided resources:
> adding
> > > > > TaskManagers will scale the job up, removing them will scale it
> down
> > > > again.
> > > > >
> > > > > Here's the link to the Wiki:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > > >
> > > > > We are very excited to hear your feedback about the proposal!
> > > > >
> > > > > Best,
> > > > > Robert
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Robert Metzger <rm...@apache.org>.
Thank you very much for the comments so far.

@Steven:

No fixed parallelism for any of the operators
>
> Regarding this limitation, can the scheduler only adjust the default
> parallelism? if some operators set parallelism explicitly (like always 1),
> just leave them unchanged.


We will respect the configured maxParallelism for that purpose. If you have
an operator that is not intended to run in parallel, you can set
maxParalellism=1.

@Xintong:

the cluster configuration option will limit us from having jobs running
> with different execution modes in the same session cluster.


I'm not sure if it makes sense to support reactive mode in a session
cluster ever. For an autoscaling mode, it probably makes sense (as we can
just combine the resource requests from all running jobs, and distribute
the available resources proportional to the requested resources).

I will state more clearly in the FLIP that the configuration options should
be marked as experimental.

Active resource managers

[...]

If this is the only concern, I'd like to bring the configuration option
> `slotmanager.number-of-slots.max` to your attention.


I understand and agree that it would be really nice to support active
resource managers with the new scheduler right away. In my opinion,
reactive mode will never be really supported by active resource managers,
as this is a contradiction with the idea of reactive mode: It is explicitly
designed to allow controlling the cluster from the outside (similar to
Kafka streams, where you add and remove capacity for scaling). Integration
with active resource managers should be added in a autoscaling mode, based
on the declarative scheduler.
I've considered the slotmanager.number-of-slots.max option as well, but it
basically means that your cluster will always immediately scale up
to slotmanager.number-of-slots.max and stick to that value, even if those
resources are not needed.
On YARN, it would be pretty difficult or even impossible to control the
scale of such a Flink deployment from the outside (using a queue with the
capacity scheduler won't work, as changes to queues require restarts)
On K8s, one would have to build a custom tool that finds the deployment
created by Flink and adjusts it. Then, it's probably easier to just create
a standalone deployment on K8s.

@Yang:

It will be better to make the 10 seconds to be configurable.


I agree that it is pretty bold to have such an important configuration
parameter hardcoded. We proposed it like this to keep the first
implementation as simple as possible.
But if we see that basically everybody is asking for this, or if we have
time left at the end of the release cycle, we'll make it configurable.


but also the ScalingPolicy is not exposed to the users now


Exposing the ScalingPolicy to the user is very high on our priority list,
but we want to keep the first version as simple as possible, to be able to
deliver the overall feature in time, and to collect some initial user
feedback before coming up with an interface we want to expose.







On Mon, Jan 25, 2021 at 8:37 AM Yang Wang <da...@gmail.com> wrote:

> Thanks Robert for creating this FLIP and starting the discussion.
>
> This is a great start point to make Flink work with auto scaling service.
> The reactive mode
> is very useful in containerized environment(e.g. docker, Kubernetes). For
> example, combined
> with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers could be
> started/released
> dynamically based on the system metrics(e.g. cpu, memory) and custom
> metrics(e.g. delay, latency).
>
>
> > Once the job has started running, and a TaskManager is lost, it will wait
> > for 10 seconds for the
>
> TaskManager to re-appear.
>
> It will be better to make the 10 seconds to be configurable. According to
> our production experience
> on Kubernetes, 10 seconds is not enough for a pod to be relaunched. Maybe
> this is also a specific
> case whether the resource is stable or not.
>
> > Active ResourceManager
>
> IIUC, the reason why reactive mode could not work with active resource
> manager is not only
> about requesting infinite amount of resources, but also the ScalingPolicy
> is not exposed to the
> users now. ScalingPolicy could be the bridge between reactive mode and
> active resource manager.
> User could have their own auto scaling service, which monitor the Flink
> metrics and then update
> the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active resource
> manager could allocate these
> TaskManagers.
> But it is out the scope of this FLIP, I really expect this could be done in
> the future. And it will be another
> great step to make Flink auto scalable.
>
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/
>
>
> Best,
> Yang
>
>
> Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:
>
> > Thanks for preparing the FLIP and starting the discussion, Robert.
> >
> > ## Cluster vs. Job configuration
> > As I have commented on the FLIP-160 discussion thread [1], I'm a bit
> unsure
> > about activating the reactive execution mode via a cluster level
> > configuration option. I'm aware that in the first step this feature does
> > not support session clusters. However, I think that does not mean it
> won't
> > be supported in future. In that case, the cluster configuration option
> will
> > limit us from having jobs running with different execution modes in the
> > same session cluster.
> >
> > ## Active resource managers
> > According to the FLIP, this feature explicitly does not support active
> > resource managers. IIUC, this is because when in this feature the job
> > requests an infinite amount of resources, which would flood Kubernetes /
> > Yarn / Mesos with unreasonably large number of resource requests. If this
> > is the only concern, I'd like to bring the configuration option
> > `slotmanager.number-of-slots.max` to your attention. This feature allows
> > putting an upper limit to the total number of slots the Flink cluster
> uses,
> > preventing active resource managers from allocating too many resources
> from
> > Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> > overlooked, I think it would be nicer for the reactive mode to also
> support
> > active resource managers, with the additional requirement to explicitly
> > configure the max slots.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
> >
> > On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com> wrote:
> >
> > > Thanks a lot for the proposal, Robert and Till.
> > >
> > > > No fixed parallelism for any of the operators
> > >
> > > Regarding this limitation, can the scheduler only adjust the default
> > > parallelism? if some operators set parallelism explicitly (like always
> > 1),
> > > just leave them unchanged.
> > >
> > >
> > > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rm...@apache.org>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Till started a discussion about FLIP-160: Declarative scheduler [1]
> > > earlier
> > > > today, the first major feature based on that effort will be FLIP-159:
> > > > Reactive Mode. It allows users to operate Flink in a way that it
> > > reactively
> > > > scales the job up or down depending on the provided resources: adding
> > > > TaskManagers will scale the job up, removing them will scale it down
> > > again.
> > > >
> > > > Here's the link to the Wiki:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > > >
> > > > We are very excited to hear your feedback about the proposal!
> > > >
> > > > Best,
> > > > Robert
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > > >
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Yang Wang <da...@gmail.com>.
Thanks Robert for creating this FLIP and starting the discussion.

This is a great start point to make Flink work with auto scaling service.
The reactive mode
is very useful in containerized environment(e.g. docker, Kubernetes). For
example, combined
with Kubernetes "Horizontal Pod Autoscaler"[1], the TaskManagers could be
started/released
dynamically based on the system metrics(e.g. cpu, memory) and custom
metrics(e.g. delay, latency).


> Once the job has started running, and a TaskManager is lost, it will wait
> for 10 seconds for the

TaskManager to re-appear.

It will be better to make the 10 seconds to be configurable. According to
our production experience
on Kubernetes, 10 seconds is not enough for a pod to be relaunched. Maybe
this is also a specific
case whether the resource is stable or not.

> Active ResourceManager

IIUC, the reason why reactive mode could not work with active resource
manager is not only
about requesting infinite amount of resources, but also the ScalingPolicy
is not exposed to the
users now. ScalingPolicy could be the bridge between reactive mode and
active resource manager.
User could have their own auto scaling service, which monitor the Flink
metrics and then update
the ScalingPolicy(e.g. parallelism 10 -> 20). Then the active resource
manager could allocate these
TaskManagers.
But it is out the scope of this FLIP, I really expect this could be done in
the future. And it will be another
great step to make Flink auto scalable.



[1].
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/


Best,
Yang


Xintong Song <to...@gmail.com> 于2021年1月25日周一 上午10:29写道:

> Thanks for preparing the FLIP and starting the discussion, Robert.
>
> ## Cluster vs. Job configuration
> As I have commented on the FLIP-160 discussion thread [1], I'm a bit unsure
> about activating the reactive execution mode via a cluster level
> configuration option. I'm aware that in the first step this feature does
> not support session clusters. However, I think that does not mean it won't
> be supported in future. In that case, the cluster configuration option will
> limit us from having jobs running with different execution modes in the
> same session cluster.
>
> ## Active resource managers
> According to the FLIP, this feature explicitly does not support active
> resource managers. IIUC, this is because when in this feature the job
> requests an infinite amount of resources, which would flood Kubernetes /
> Yarn / Mesos with unreasonably large number of resource requests. If this
> is the only concern, I'd like to bring the configuration option
> `slotmanager.number-of-slots.max` to your attention. This feature allows
> putting an upper limit to the total number of slots the Flink cluster uses,
> preventing active resource managers from allocating too many resources from
> Kubernetes / Yarn / Mesos. Unless there are other concerns that I
> overlooked, I think it would be nicer for the reactive mode to also support
> active resource managers, with the additional requirement to explicitly
> configure the max slots.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html
>
> On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com> wrote:
>
> > Thanks a lot for the proposal, Robert and Till.
> >
> > > No fixed parallelism for any of the operators
> >
> > Regarding this limitation, can the scheduler only adjust the default
> > parallelism? if some operators set parallelism explicitly (like always
> 1),
> > just leave them unchanged.
> >
> >
> > On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rm...@apache.org>
> > wrote:
> >
> > > Hi all,
> > >
> > > Till started a discussion about FLIP-160: Declarative scheduler [1]
> > earlier
> > > today, the first major feature based on that effort will be FLIP-159:
> > > Reactive Mode. It allows users to operate Flink in a way that it
> > reactively
> > > scales the job up or down depending on the provided resources: adding
> > > TaskManagers will scale the job up, removing them will scale it down
> > again.
> > >
> > > Here's the link to the Wiki:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> > >
> > > We are very excited to hear your feedback about the proposal!
> > >
> > > Best,
> > > Robert
> > >
> > > [1]
> > >
> > >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> > >
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Xintong Song <to...@gmail.com>.
Thanks for preparing the FLIP and starting the discussion, Robert.

## Cluster vs. Job configuration
As I have commented on the FLIP-160 discussion thread [1], I'm a bit unsure
about activating the reactive execution mode via a cluster level
configuration option. I'm aware that in the first step this feature does
not support session clusters. However, I think that does not mean it won't
be supported in future. In that case, the cluster configuration option will
limit us from having jobs running with different execution modes in the
same session cluster.

## Active resource managers
According to the FLIP, this feature explicitly does not support active
resource managers. IIUC, this is because when in this feature the job
requests an infinite amount of resources, which would flood Kubernetes /
Yarn / Mesos with unreasonably large number of resource requests. If this
is the only concern, I'd like to bring the configuration option
`slotmanager.number-of-slots.max` to your attention. This feature allows
putting an upper limit to the total number of slots the Flink cluster uses,
preventing active resource managers from allocating too many resources from
Kubernetes / Yarn / Mesos. Unless there are other concerns that I
overlooked, I think it would be nicer for the reactive mode to also support
active resource managers, with the additional requirement to explicitly
configure the max slots.

Thank you~

Xintong Song


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-160-Declarative-scheduler-td48165.html

On Sat, Jan 23, 2021 at 5:59 AM Steven Wu <st...@gmail.com> wrote:

> Thanks a lot for the proposal, Robert and Till.
>
> > No fixed parallelism for any of the operators
>
> Regarding this limitation, can the scheduler only adjust the default
> parallelism? if some operators set parallelism explicitly (like always 1),
> just leave them unchanged.
>
>
> On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rm...@apache.org>
> wrote:
>
> > Hi all,
> >
> > Till started a discussion about FLIP-160: Declarative scheduler [1]
> earlier
> > today, the first major feature based on that effort will be FLIP-159:
> > Reactive Mode. It allows users to operate Flink in a way that it
> reactively
> > scales the job up or down depending on the provided resources: adding
> > TaskManagers will scale the job up, removing them will scale it down
> again.
> >
> > Here's the link to the Wiki:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> >
> > We are very excited to hear your feedback about the proposal!
> >
> > Best,
> > Robert
> >
> > [1]
> >
> >
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
> >
>

Re: [DISCUSS] FLIP-159: Reactive Mode

Posted by Steven Wu <st...@gmail.com>.
Thanks a lot for the proposal, Robert and Till.

> No fixed parallelism for any of the operators

Regarding this limitation, can the scheduler only adjust the default
parallelism? if some operators set parallelism explicitly (like always 1),
just leave them unchanged.


On Fri, Jan 22, 2021 at 8:42 AM Robert Metzger <rm...@apache.org> wrote:

> Hi all,
>
> Till started a discussion about FLIP-160: Declarative scheduler [1] earlier
> today, the first major feature based on that effort will be FLIP-159:
> Reactive Mode. It allows users to operate Flink in a way that it reactively
> scales the job up or down depending on the provided resources: adding
> TaskManagers will scale the job up, removing them will scale it down again.
>
> Here's the link to the Wiki:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
>
> We are very excited to hear your feedback about the proposal!
>
> Best,
> Robert
>
> [1]
>
> https://lists.apache.org/thread.html/r604a01f739639e2a5f093fbe7894c172125530332747ecf6990a6ce4%40%3Cdev.flink.apache.org%3E
>