You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2022/05/23 06:32:32 UTC

About Native Deployment's Autoscaling implementation

Hi,
I am working on auto scaling support for native deployments. Today Flink
provides Reactive mode however it only runs on standalone deployments. We
use Kubernetes native deployment. So I want to increase or decrease job
resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
useful to achieve this goal. I started reading code of Flink JobManager,
AdaptiveScheduler and DeclarativeSlotPool etc.

My assumption is Required Resources will be calculated on AdaptiveScheduler
whenever the scheduler receives a heartbeat from a task manager by calling
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
method.

I checked TaskExecutorToJobManagerHeartbeatPayload class however I only see
*accumulatorReport* and *executionDeploymentReport* . Do you have any
suggestions to collect metrics from TaskManagers ? Should I add metrics on
TaskExecutorToJobManagerHeartbeatPayload ?

I am open to another suggestion for this. Whenever I finalize my
investigation. I will create a FLIP for more detailed implementation.

Thanks for your help in advance.
Talat

Re: About Native Deployment's Autoscaling implementation

Posted by Yang Wang <da...@gmail.com>.
Hi Talat,

Using sub resources for the auto scaling makes a lot of sense to me.

Could you be more specific why you think changing task manager count will
> not work for native deployment ?


The native K8s integration is using active resourcemanager. It means that
the TaskManager count will be calculated by *parallelism / numTaskSlot*. If
we want to add more TaskManager pods, then we need to increase the
parallelism.
Not like the standalone deployment, there's no way to directly configure
the TaskManager count.

Given that we could not set the replicas of TaskManager pods when using
native K8s integration, we need to calculate and configure the parallelism
via [min/max]*replicas * numTaskSlots*. See the prototype in Gyula's PR[1].
So the problem is how could we change the parallelism without creating the
Flink application again. We do not have a restAPI for this.

[1]. https://github.com/apache/flink-kubernetes-operator/pull/227

Best,
Yang

Talat Uyarer <tu...@paloaltonetworks.com> 于2022年6月1日周三 08:34写道:

> Hi Yang and Gyula,
>
> Yang, Could you give a little bit more information ?  What prevents us
> from changing task managers' count ? I am aware of ActiveResourceManager of
> Flink. But Flink only calls resources when it initializes a cluster.
> If we set
>
>    - jobmanager.scheduler: adaptive
>    - cluster.declarative-resource-management.enabled: true
>
> While deploying a Flink Native cluster. Even though it is native
> deployment. Flink will be able to add task manager add/remove behavior.
> Because basically adding/removing a task manager is similar to recovering a
> failed task manager.
>
> Could you be more specific why you think changing task manager count will
> not work for native deployment ? I will not use reactive-mode. Scaling up
> or down will be handled by HPA. We will define sub sources.[1]
> Users will give us starting points such as replicaCount and max count such
> as maxRecplicaCount. Flink clusters will be initialized by replicaCount for
> TaskManager.
>
> Gyula, I want to make HPA part of FlinkDeployment. And introduce auto
> scaling settings such as metric service endpoints and some other default
> settings such as threshold etc. to reduce complexity. Let me start
> implementing something after Yang's answer. When users enable autoscaling
> we need to also set scheduler and declarative resource management settings
> behind the scenes.
>
> Thanks
>
> [1]
> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
>
> On Mon, May 30, 2022 at 2:25 AM Yang Wang <da...@gmail.com> wrote:
>
>> >
>> > I thought we could enable Adaptive Scheduler, so adding or removing a
>> task
>> > manager is the same as restarting a job when we use an adaptive
>> scheduler.
>> > Do I miss anything ?
>>
>>
>> It is true for standalone mode since adding/removing a TaskManager pod is
>> fully controlled by users(or external tools).
>> But it is not valid for native K8s integration[1]. Currently, we could not
>> dynamically change the TaskManager pods once the job is running.
>>
>> I really hope the HPA could work for both standalone and native mode.
>>
>>
>> [1].
>> https://urldefense.com/v3/__https://flink.apache.org/2021/02/10/native-k8s-with-ha.html__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgfaqMAuI$
>>
>> Best,
>> Yang
>>
>> Gyula Fóra <gy...@gmail.com> 于2022年5月30日周一 12:23写道:
>>
>> > Hi Talat!
>> >
>> > Sorry for the late reply, I have been busy with some fixes for the
>> release
>> > and travelling.
>> >
>> > I think the prometheus metrics integration sounds like a great idea that
>> > would cover the needs of most users.
>> > This way users can also integrate easily with the custom Flink metrics
>> too.
>> >
>> > maxReplicas: We could add this easily to the taskManager resource specs
>> >
>> > Nice workflow picture, I would love to include this in the docs later.
>> One
>> > minor comment, should the HPA be outside of the FlinkDeployment box?
>> >
>> > Cheers,
>> > Gyula
>> >
>> > On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <
>> tuyarer@paloaltonetworks.com>
>> > wrote:
>> >
>> >> Hi Yang,
>> >>
>> >> I thought we could enable Adaptive Scheduler, so adding or removing a
>> task
>> >> manager is the same as restarting a job when we use an adaptive
>> scheduler.
>> >> Do I miss anything ?
>> >>
>> >> Thanks
>> >>
>> >> On Tue, May 24, 2022 at 8:16 PM Yang Wang <da...@gmail.com>
>> wrote:
>> >>
>> >> > Thanks for the interesting discussion.
>> >> >
>> >> > Compared with reactive mode, leveraging the
>> flink-kubernetes-operator to
>> >> > do the job restarting/upgrading is another solution for auto-scaling.
>> >> > Given that fully restarting a Flink application on K8s is not too
>> slow,
>> >> > this is a reasonable way.
>> >> > Really hope we could get some progress in such area.
>> >> >
>> >> > Best,
>> >> > Yang
>> >> >
>> >> > Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:
>> >> >
>> >> >> Hi Talat!
>> >> >>
>> >> >> It would be great to have a HPA that works based on some flink
>> >> >> throughput/backlog metrics. I wonder how you are going to access the
>> >> Flink
>> >> >> metrics in the HPA, we might need some integration with the k8s
>> metrics
>> >> >> system.
>> >> >> In any case whether we need a FLIP or not depends on the
>> complexity, if
>> >> >> it's simple then we can go without a FLIP.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> >> >> tuyarer@paloaltonetworks.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Hi Gyula,
>> >> >> >
>> >> >> > This seems very promising for initial scaling. We are using Flink
>> >> >> > Kubernetes Operators. Most probably we are very early adapters for
>> >> it :)
>> >> >> > Let me try it. Get back to you soon.
>> >> >> >
>> >> >> > My plan is building a general purpose CPU and backlog/throughput
>> base
>> >> >> > autoscaling for Flink. I can create a Custom Open Source HPA on
>> top
>> >> of
>> >> >> your
>> >> >> > changes. Do I need to create a FLIP for it ?
>> >> >> >
>> >> >> > Just general information about us Today we use another execution
>> env.
>> >> >> if
>> >> >> > the Job scheduler does not support autoscaling. Having a HPA
>> works if
>> >> >> your
>> >> >> > sources are well balanced. If there is uneven distribution on
>> >> sources,
>> >> >> > Having auto scaling feature on scheduler can help better
>> utilization.
>> >> >> But
>> >> >> > this is not urgent. We can start using your PR at least for a
>> while.
>> >> >> >
>> >> >> > Thanks
>> >> >> >
>> >> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com>
>> >> >> wrote:
>> >> >> >
>> >> >> >> Hi Talat!
>> >> >> >>
>> >> >> >> One other approach that we are investigating currently is
>> combining
>> >> >> the Flink
>> >> >> >> Kubernetes Operator
>> >> >> >> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
>> >> >
>> >> >> with
>> >> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >> >> >>
>> >> >> >> In this approach the HPA monitors the Taskmanager pods directly
>> and
>> >> can
>> >> >> >> modify the FlinkDeployment resource replica number to trigger a
>> >> >> stateful
>> >> >> >> job scale-up/down through the operator.
>> >> >> >> Obviously not as nice as the reactive mode but it works with the
>> >> >> current
>> >> >> >> Kubernetes Native implementation easily. It is also theoretically
>> >> >> possible
>> >> >> >> to integrate this with other custom Flink metrics but we haven't
>> >> >> tested yet.
>> >> >> >>
>> >> >> >> I have a created a POC pull request that showcases these
>> >> capabilities:
>> >> >> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgG7Oirow$
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
>> >> >
>> >> >> >> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >> >> >
>> >> >> >>
>> >> >> >> If you are interested it would be nice if you could check it out
>> and
>> >> >> >> provide feedback, we will get back to refining this after our
>> >> current
>> >> >> >> ongoing release.
>> >> >> >>
>> >> >> >> Cheers,
>> >> >> >> Gyula
>> >> >> >>
>> >> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org>
>> >> >> wrote:
>> >> >> >>
>> >> >> >>> Hi Talat,
>> >> >> >>>
>> >> >> >>> This is definitely an interesting and rather complex topic.
>> >> >> >>>
>> >> >> >>> Few unstructured thoughts / notes / questions:
>> >> >> >>>
>> >> >> >>> - The main struggle has always been that it's hard to come up
>> with
>> >> a
>> >> >> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >> >> >>>   - Flink doesn't have knowledge of the external environment
>> (eg.
>> >> >> >>> capacity
>> >> >> >>> planning on the cluster, no notion of pre-emption), so it can
>> not
>> >> >> really
>> >> >> >>> make a qualified decision in some cases.
>> >> >> >>>   - ^ the above goes along the same reasoning as why we don't
>> >> support
>> >> >> >>> reactive mode with the session cluster (multi-job scheduling)
>> >> >> >>> - The re-scaling decision logic most likely needs to be
>> pluggable
>> >> from
>> >> >> >>> the
>> >> >> >>> above reasons
>> >> >> >>>   - We're in general fairly concerned about running any user
>> code
>> >> in
>> >> >> JM
>> >> >> >>> for
>> >> >> >>> stability reasons.
>> >> >> >>>   - The most flexible option would be allowing to set the
>> desired
>> >> >> >>> parallelism via rest api and leave the scaling decision to an
>> >> external
>> >> >> >>> process, which could be reused for both standalone and "active"
>> >> >> >>> deployment
>> >> >> >>> modes (there is actually a prototype by Till, that allows this
>> [1])
>> >> >> >>>
>> >> >> >>> How do you intend to make an autoscaling decision? Also note
>> that
>> >> the
>> >> >> >>> re-scaling is still a fairly expensive operation (especially
>> with
>> >> >> large
>> >> >> >>> state), so you need to make sure autoscaler doesn't oscillate
>> and
>> >> >> doesn't
>> >> >> >>> re-scale too often (this is also something that could vary from
>> >> >> workload
>> >> >> >>> to
>> >> >> >>> workload).
>> >> >> >>>
>> >> >> >>> Note on the metrics question with an auto-scaler living in the
>> JM:
>> >> >> >>> - We shouldn't really collect the metrics into the JM, but
>> instead
>> >> JM
>> >> >> can
>> >> >> >>> pull then from TMs directly on-demand (basically the same thing
>> and
>> >> >> >>> external auto-scaler would do).
>> >> >> >>>
>> >> >> >>> Looking forward to your thoughts
>> >> >> >>>
>> >> >> >>> [1]
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0Vg382rLOI$
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
>> >> >
>> >> >> >>> <
>> >> >>
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >> >> >
>> >> >> >>>
>> >> >> >>> Best,
>> >> >> >>> D.
>> >> >> >>>
>> >> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >> >> >>> tuyarer@paloaltonetworks.com>
>> >> >> >>> wrote:
>> >> >> >>>
>> >> >> >>> > Hi,
>> >> >> >>> > I am working on auto scaling support for native deployments.
>> >> Today
>> >> >> >>> Flink
>> >> >> >>> > provides Reactive mode however it only runs on standalone
>> >> >> deployments.
>> >> >> >>> We
>> >> >> >>> > use Kubernetes native deployment. So I want to increase or
>> >> decrease
>> >> >> job
>> >> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160
>> are
>> >> >> very
>> >> >> >>> > useful to achieve this goal. I started reading code of Flink
>> >> >> >>> JobManager,
>> >> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >> >> >>> >
>> >> >> >>> > My assumption is Required Resources will be calculated on
>> >> >> >>> AdaptiveScheduler
>> >> >> >>> > whenever the scheduler receives a heartbeat from a task
>> manager
>> >> by
>> >> >> >>> calling
>> >> >> >>> > public void updateAccumulators(AccumulatorSnapshot
>> >> >> accumulatorSnapshot)
>> >> >> >>> > method.
>> >> >> >>> >
>> >> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class
>> however
>> >> I
>> >> >> >>> only see
>> >> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you
>> have
>> >> >> any
>> >> >> >>> > suggestions to collect metrics from TaskManagers ? Should I
>> add
>> >> >> >>> metrics on
>> >> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >> >> >>> >
>> >> >> >>> > I am open to another suggestion for this. Whenever I finalize
>> my
>> >> >> >>> > investigation. I will create a FLIP for more detailed
>> >> >> implementation.
>> >> >> >>> >
>> >> >> >>> > Thanks for your help in advance.
>> >> >> >>> > Talat
>> >> >> >>> >
>> >> >> >>>
>> >> >> >>
>> >> >>
>> >> >
>> >>
>> >
>>
>

Re: About Native Deployment's Autoscaling implementation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Yang and Gyula,

Yang, Could you give a little bit more information ?  What prevents us from
changing task managers' count ? I am aware of ActiveResourceManager of
Flink. But Flink only calls resources when it initializes a cluster.
If we set

   - jobmanager.scheduler: adaptive
   - cluster.declarative-resource-management.enabled: true

While deploying a Flink Native cluster. Even though it is native
deployment. Flink will be able to add task manager add/remove behavior.
Because basically adding/removing a task manager is similar to recovering a
failed task manager.

Could you be more specific why you think changing task manager count will
not work for native deployment ? I will not use reactive-mode. Scaling up
or down will be handled by HPA. We will define sub sources.[1]
Users will give us starting points such as replicaCount and max count such
as maxRecplicaCount. Flink clusters will be initialized by replicaCount for
TaskManager.

Gyula, I want to make HPA part of FlinkDeployment. And introduce auto
scaling settings such as metric service endpoints and some other default
settings such as threshold etc. to reduce complexity. Let me start
implementing something after Yang's answer. When users enable autoscaling
we need to also set scheduler and declarative resource management settings
behind the scenes.

Thanks

[1]
https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource

On Mon, May 30, 2022 at 2:25 AM Yang Wang <da...@gmail.com> wrote:

> >
> > I thought we could enable Adaptive Scheduler, so adding or removing a
> task
> > manager is the same as restarting a job when we use an adaptive
> scheduler.
> > Do I miss anything ?
>
>
> It is true for standalone mode since adding/removing a TaskManager pod is
> fully controlled by users(or external tools).
> But it is not valid for native K8s integration[1]. Currently, we could not
> dynamically change the TaskManager pods once the job is running.
>
> I really hope the HPA could work for both standalone and native mode.
>
>
> [1].
> https://urldefense.com/v3/__https://flink.apache.org/2021/02/10/native-k8s-with-ha.html__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgfaqMAuI$
>
> Best,
> Yang
>
> Gyula Fóra <gy...@gmail.com> 于2022年5月30日周一 12:23写道:
>
> > Hi Talat!
> >
> > Sorry for the late reply, I have been busy with some fixes for the
> release
> > and travelling.
> >
> > I think the prometheus metrics integration sounds like a great idea that
> > would cover the needs of most users.
> > This way users can also integrate easily with the custom Flink metrics
> too.
> >
> > maxReplicas: We could add this easily to the taskManager resource specs
> >
> > Nice workflow picture, I would love to include this in the docs later.
> One
> > minor comment, should the HPA be outside of the FlinkDeployment box?
> >
> > Cheers,
> > Gyula
> >
> > On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <
> tuyarer@paloaltonetworks.com>
> > wrote:
> >
> >> Hi Yang,
> >>
> >> I thought we could enable Adaptive Scheduler, so adding or removing a
> task
> >> manager is the same as restarting a job when we use an adaptive
> scheduler.
> >> Do I miss anything ?
> >>
> >> Thanks
> >>
> >> On Tue, May 24, 2022 at 8:16 PM Yang Wang <da...@gmail.com>
> wrote:
> >>
> >> > Thanks for the interesting discussion.
> >> >
> >> > Compared with reactive mode, leveraging the flink-kubernetes-operator
> to
> >> > do the job restarting/upgrading is another solution for auto-scaling.
> >> > Given that fully restarting a Flink application on K8s is not too
> slow,
> >> > this is a reasonable way.
> >> > Really hope we could get some progress in such area.
> >> >
> >> > Best,
> >> > Yang
> >> >
> >> > Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:
> >> >
> >> >> Hi Talat!
> >> >>
> >> >> It would be great to have a HPA that works based on some flink
> >> >> throughput/backlog metrics. I wonder how you are going to access the
> >> Flink
> >> >> metrics in the HPA, we might need some integration with the k8s
> metrics
> >> >> system.
> >> >> In any case whether we need a FLIP or not depends on the complexity,
> if
> >> >> it's simple then we can go without a FLIP.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
> >> >> tuyarer@paloaltonetworks.com>
> >> >> wrote:
> >> >>
> >> >> > Hi Gyula,
> >> >> >
> >> >> > This seems very promising for initial scaling. We are using Flink
> >> >> > Kubernetes Operators. Most probably we are very early adapters for
> >> it :)
> >> >> > Let me try it. Get back to you soon.
> >> >> >
> >> >> > My plan is building a general purpose CPU and backlog/throughput
> base
> >> >> > autoscaling for Flink. I can create a Custom Open Source HPA on top
> >> of
> >> >> your
> >> >> > changes. Do I need to create a FLIP for it ?
> >> >> >
> >> >> > Just general information about us Today we use another execution
> env.
> >> >> if
> >> >> > the Job scheduler does not support autoscaling. Having a HPA works
> if
> >> >> your
> >> >> > sources are well balanced. If there is uneven distribution on
> >> sources,
> >> >> > Having auto scaling feature on scheduler can help better
> utilization.
> >> >> But
> >> >> > this is not urgent. We can start using your PR at least for a
> while.
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com>
> >> >> wrote:
> >> >> >
> >> >> >> Hi Talat!
> >> >> >>
> >> >> >> One other approach that we are investigating currently is
> combining
> >> >> the Flink
> >> >> >> Kubernetes Operator
> >> >> >> <
> >> >>
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
> >> >
> >> >> with
> >> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
> >> >> >>
> >> >> >> In this approach the HPA monitors the Taskmanager pods directly
> and
> >> can
> >> >> >> modify the FlinkDeployment resource replica number to trigger a
> >> >> stateful
> >> >> >> job scale-up/down through the operator.
> >> >> >> Obviously not as nice as the reactive mode but it works with the
> >> >> current
> >> >> >> Kubernetes Native implementation easily. It is also theoretically
> >> >> possible
> >> >> >> to integrate this with other custom Flink metrics but we haven't
> >> >> tested yet.
> >> >> >>
> >> >> >> I have a created a POC pull request that showcases these
> >> capabilities:
> >> >> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0VgG7Oirow$
> >> >> <
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
> >> >
> >> >> >> <
> >> >>
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
> >> >> >
> >> >> >>
> >> >> >> If you are interested it would be nice if you could check it out
> and
> >> >> >> provide feedback, we will get back to refining this after our
> >> current
> >> >> >> ongoing release.
> >> >> >>
> >> >> >> Cheers,
> >> >> >> Gyula
> >> >> >>
> >> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org>
> >> >> wrote:
> >> >> >>
> >> >> >>> Hi Talat,
> >> >> >>>
> >> >> >>> This is definitely an interesting and rather complex topic.
> >> >> >>>
> >> >> >>> Few unstructured thoughts / notes / questions:
> >> >> >>>
> >> >> >>> - The main struggle has always been that it's hard to come up
> with
> >> a
> >> >> >>> generic one-size-fits-it-all metrics for autoscaling.
> >> >> >>>   - Flink doesn't have knowledge of the external environment (eg.
> >> >> >>> capacity
> >> >> >>> planning on the cluster, no notion of pre-emption), so it can not
> >> >> really
> >> >> >>> make a qualified decision in some cases.
> >> >> >>>   - ^ the above goes along the same reasoning as why we don't
> >> support
> >> >> >>> reactive mode with the session cluster (multi-job scheduling)
> >> >> >>> - The re-scaling decision logic most likely needs to be pluggable
> >> from
> >> >> >>> the
> >> >> >>> above reasons
> >> >> >>>   - We're in general fairly concerned about running any user code
> >> in
> >> >> JM
> >> >> >>> for
> >> >> >>> stability reasons.
> >> >> >>>   - The most flexible option would be allowing to set the desired
> >> >> >>> parallelism via rest api and leave the scaling decision to an
> >> external
> >> >> >>> process, which could be reused for both standalone and "active"
> >> >> >>> deployment
> >> >> >>> modes (there is actually a prototype by Till, that allows this
> [1])
> >> >> >>>
> >> >> >>> How do you intend to make an autoscaling decision? Also note that
> >> the
> >> >> >>> re-scaling is still a fairly expensive operation (especially with
> >> >> large
> >> >> >>> state), so you need to make sure autoscaler doesn't oscillate and
> >> >> doesn't
> >> >> >>> re-scale too often (this is also something that could vary from
> >> >> workload
> >> >> >>> to
> >> >> >>> workload).
> >> >> >>>
> >> >> >>> Note on the metrics question with an auto-scaler living in the
> JM:
> >> >> >>> - We shouldn't really collect the metrics into the JM, but
> instead
> >> JM
> >> >> can
> >> >> >>> pull then from TMs directly on-demand (basically the same thing
> and
> >> >> >>> external auto-scaler would do).
> >> >> >>>
> >> >> >>> Looking forward to your thoughts
> >> >> >>>
> >> >> >>> [1]
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fSD-IVwNQGjF2jX2ExbTekF48yBxK1zCCVa3T-thC0h0G6Q_X5CWKiOnr7yYf8pgzdOCFer91CODppTow0Vg382rLOI$
> >> >> <
> >>
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
> >> >
> >> >> >>> <
> >> >>
> >>
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
> >> >> >
> >> >> >>>
> >> >> >>> Best,
> >> >> >>> D.
> >> >> >>>
> >> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
> >> >> >>> tuyarer@paloaltonetworks.com>
> >> >> >>> wrote:
> >> >> >>>
> >> >> >>> > Hi,
> >> >> >>> > I am working on auto scaling support for native deployments.
> >> Today
> >> >> >>> Flink
> >> >> >>> > provides Reactive mode however it only runs on standalone
> >> >> deployments.
> >> >> >>> We
> >> >> >>> > use Kubernetes native deployment. So I want to increase or
> >> decrease
> >> >> job
> >> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160
> are
> >> >> very
> >> >> >>> > useful to achieve this goal. I started reading code of Flink
> >> >> >>> JobManager,
> >> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
> >> >> >>> >
> >> >> >>> > My assumption is Required Resources will be calculated on
> >> >> >>> AdaptiveScheduler
> >> >> >>> > whenever the scheduler receives a heartbeat from a task manager
> >> by
> >> >> >>> calling
> >> >> >>> > public void updateAccumulators(AccumulatorSnapshot
> >> >> accumulatorSnapshot)
> >> >> >>> > method.
> >> >> >>> >
> >> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class
> however
> >> I
> >> >> >>> only see
> >> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you
> have
> >> >> any
> >> >> >>> > suggestions to collect metrics from TaskManagers ? Should I add
> >> >> >>> metrics on
> >> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
> >> >> >>> >
> >> >> >>> > I am open to another suggestion for this. Whenever I finalize
> my
> >> >> >>> > investigation. I will create a FLIP for more detailed
> >> >> implementation.
> >> >> >>> >
> >> >> >>> > Thanks for your help in advance.
> >> >> >>> > Talat
> >> >> >>> >
> >> >> >>>
> >> >> >>
> >> >>
> >> >
> >>
> >
>

Re: About Native Deployment's Autoscaling implementation

Posted by Yang Wang <da...@gmail.com>.
>
> I thought we could enable Adaptive Scheduler, so adding or removing a task
> manager is the same as restarting a job when we use an adaptive scheduler.
> Do I miss anything ?


It is true for standalone mode since adding/removing a TaskManager pod is
fully controlled by users(or external tools).
But it is not valid for native K8s integration[1]. Currently, we could not
dynamically change the TaskManager pods once the job is running.

I really hope the HPA could work for both standalone and native mode.


[1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html

Best,
Yang

Gyula Fóra <gy...@gmail.com> 于2022年5月30日周一 12:23写道:

> Hi Talat!
>
> Sorry for the late reply, I have been busy with some fixes for the release
> and travelling.
>
> I think the prometheus metrics integration sounds like a great idea that
> would cover the needs of most users.
> This way users can also integrate easily with the custom Flink metrics too.
>
> maxReplicas: We could add this easily to the taskManager resource specs
>
> Nice workflow picture, I would love to include this in the docs later. One
> minor comment, should the HPA be outside of the FlinkDeployment box?
>
> Cheers,
> Gyula
>
> On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
>> Hi Yang,
>>
>> I thought we could enable Adaptive Scheduler, so adding or removing a task
>> manager is the same as restarting a job when we use an adaptive scheduler.
>> Do I miss anything ?
>>
>> Thanks
>>
>> On Tue, May 24, 2022 at 8:16 PM Yang Wang <da...@gmail.com> wrote:
>>
>> > Thanks for the interesting discussion.
>> >
>> > Compared with reactive mode, leveraging the flink-kubernetes-operator to
>> > do the job restarting/upgrading is another solution for auto-scaling.
>> > Given that fully restarting a Flink application on K8s is not too slow,
>> > this is a reasonable way.
>> > Really hope we could get some progress in such area.
>> >
>> > Best,
>> > Yang
>> >
>> > Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:
>> >
>> >> Hi Talat!
>> >>
>> >> It would be great to have a HPA that works based on some flink
>> >> throughput/backlog metrics. I wonder how you are going to access the
>> Flink
>> >> metrics in the HPA, we might need some integration with the k8s metrics
>> >> system.
>> >> In any case whether we need a FLIP or not depends on the complexity, if
>> >> it's simple then we can go without a FLIP.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> >> tuyarer@paloaltonetworks.com>
>> >> wrote:
>> >>
>> >> > Hi Gyula,
>> >> >
>> >> > This seems very promising for initial scaling. We are using Flink
>> >> > Kubernetes Operators. Most probably we are very early adapters for
>> it :)
>> >> > Let me try it. Get back to you soon.
>> >> >
>> >> > My plan is building a general purpose CPU and backlog/throughput base
>> >> > autoscaling for Flink. I can create a Custom Open Source HPA on top
>> of
>> >> your
>> >> > changes. Do I need to create a FLIP for it ?
>> >> >
>> >> > Just general information about us Today we use another execution env.
>> >> if
>> >> > the Job scheduler does not support autoscaling. Having a HPA works if
>> >> your
>> >> > sources are well balanced. If there is uneven distribution on
>> sources,
>> >> > Having auto scaling feature on scheduler can help better utilization.
>> >> But
>> >> > this is not urgent. We can start using your PR at least for a while.
>> >> >
>> >> > Thanks
>> >> >
>> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com>
>> >> wrote:
>> >> >
>> >> >> Hi Talat!
>> >> >>
>> >> >> One other approach that we are investigating currently is combining
>> >> the Flink
>> >> >> Kubernetes Operator
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
>> >
>> >> with
>> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >> >>
>> >> >> In this approach the HPA monitors the Taskmanager pods directly and
>> can
>> >> >> modify the FlinkDeployment resource replica number to trigger a
>> >> stateful
>> >> >> job scale-up/down through the operator.
>> >> >> Obviously not as nice as the reactive mode but it works with the
>> >> current
>> >> >> Kubernetes Native implementation easily. It is also theoretically
>> >> possible
>> >> >> to integrate this with other custom Flink metrics but we haven't
>> >> tested yet.
>> >> >>
>> >> >> I have a created a POC pull request that showcases these
>> capabilities:
>> >> >> https://github.com/apache/flink-kubernetes-operator/pull/227
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
>> >
>> >> >> <
>> >>
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >> >
>> >> >>
>> >> >> If you are interested it would be nice if you could check it out and
>> >> >> provide feedback, we will get back to refining this after our
>> current
>> >> >> ongoing release.
>> >> >>
>> >> >> Cheers,
>> >> >> Gyula
>> >> >>
>> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org>
>> >> wrote:
>> >> >>
>> >> >>> Hi Talat,
>> >> >>>
>> >> >>> This is definitely an interesting and rather complex topic.
>> >> >>>
>> >> >>> Few unstructured thoughts / notes / questions:
>> >> >>>
>> >> >>> - The main struggle has always been that it's hard to come up with
>> a
>> >> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >> >>>   - Flink doesn't have knowledge of the external environment (eg.
>> >> >>> capacity
>> >> >>> planning on the cluster, no notion of pre-emption), so it can not
>> >> really
>> >> >>> make a qualified decision in some cases.
>> >> >>>   - ^ the above goes along the same reasoning as why we don't
>> support
>> >> >>> reactive mode with the session cluster (multi-job scheduling)
>> >> >>> - The re-scaling decision logic most likely needs to be pluggable
>> from
>> >> >>> the
>> >> >>> above reasons
>> >> >>>   - We're in general fairly concerned about running any user code
>> in
>> >> JM
>> >> >>> for
>> >> >>> stability reasons.
>> >> >>>   - The most flexible option would be allowing to set the desired
>> >> >>> parallelism via rest api and leave the scaling decision to an
>> external
>> >> >>> process, which could be reused for both standalone and "active"
>> >> >>> deployment
>> >> >>> modes (there is actually a prototype by Till, that allows this [1])
>> >> >>>
>> >> >>> How do you intend to make an autoscaling decision? Also note that
>> the
>> >> >>> re-scaling is still a fairly expensive operation (especially with
>> >> large
>> >> >>> state), so you need to make sure autoscaler doesn't oscillate and
>> >> doesn't
>> >> >>> re-scale too often (this is also something that could vary from
>> >> workload
>> >> >>> to
>> >> >>> workload).
>> >> >>>
>> >> >>> Note on the metrics question with an auto-scaler living in the JM:
>> >> >>> - We shouldn't really collect the metrics into the JM, but instead
>> JM
>> >> can
>> >> >>> pull then from TMs directly on-demand (basically the same thing and
>> >> >>> external auto-scaler would do).
>> >> >>>
>> >> >>> Looking forward to your thoughts
>> >> >>>
>> >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>> >> <
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
>> >
>> >> >>> <
>> >>
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >> >
>> >> >>>
>> >> >>> Best,
>> >> >>> D.
>> >> >>>
>> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >> >>> tuyarer@paloaltonetworks.com>
>> >> >>> wrote:
>> >> >>>
>> >> >>> > Hi,
>> >> >>> > I am working on auto scaling support for native deployments.
>> Today
>> >> >>> Flink
>> >> >>> > provides Reactive mode however it only runs on standalone
>> >> deployments.
>> >> >>> We
>> >> >>> > use Kubernetes native deployment. So I want to increase or
>> decrease
>> >> job
>> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
>> >> very
>> >> >>> > useful to achieve this goal. I started reading code of Flink
>> >> >>> JobManager,
>> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >> >>> >
>> >> >>> > My assumption is Required Resources will be calculated on
>> >> >>> AdaptiveScheduler
>> >> >>> > whenever the scheduler receives a heartbeat from a task manager
>> by
>> >> >>> calling
>> >> >>> > public void updateAccumulators(AccumulatorSnapshot
>> >> accumulatorSnapshot)
>> >> >>> > method.
>> >> >>> >
>> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however
>> I
>> >> >>> only see
>> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have
>> >> any
>> >> >>> > suggestions to collect metrics from TaskManagers ? Should I add
>> >> >>> metrics on
>> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >> >>> >
>> >> >>> > I am open to another suggestion for this. Whenever I finalize my
>> >> >>> > investigation. I will create a FLIP for more detailed
>> >> implementation.
>> >> >>> >
>> >> >>> > Thanks for your help in advance.
>> >> >>> > Talat
>> >> >>> >
>> >> >>>
>> >> >>
>> >>
>> >
>>
>

Re: About Native Deployment's Autoscaling implementation

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Talat!

Sorry for the late reply, I have been busy with some fixes for the release
and travelling.

I think the prometheus metrics integration sounds like a great idea that
would cover the needs of most users.
This way users can also integrate easily with the custom Flink metrics too.

maxReplicas: We could add this easily to the taskManager resource specs

Nice workflow picture, I would love to include this in the docs later. One
minor comment, should the HPA be outside of the FlinkDeployment box?

Cheers,
Gyula

On Wed, May 25, 2022 at 7:50 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi Yang,
>
> I thought we could enable Adaptive Scheduler, so adding or removing a task
> manager is the same as restarting a job when we use an adaptive scheduler.
> Do I miss anything ?
>
> Thanks
>
> On Tue, May 24, 2022 at 8:16 PM Yang Wang <da...@gmail.com> wrote:
>
> > Thanks for the interesting discussion.
> >
> > Compared with reactive mode, leveraging the flink-kubernetes-operator to
> > do the job restarting/upgrading is another solution for auto-scaling.
> > Given that fully restarting a Flink application on K8s is not too slow,
> > this is a reasonable way.
> > Really hope we could get some progress in such area.
> >
> > Best,
> > Yang
> >
> > Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:
> >
> >> Hi Talat!
> >>
> >> It would be great to have a HPA that works based on some flink
> >> throughput/backlog metrics. I wonder how you are going to access the
> Flink
> >> metrics in the HPA, we might need some integration with the k8s metrics
> >> system.
> >> In any case whether we need a FLIP or not depends on the complexity, if
> >> it's simple then we can go without a FLIP.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
> >> tuyarer@paloaltonetworks.com>
> >> wrote:
> >>
> >> > Hi Gyula,
> >> >
> >> > This seems very promising for initial scaling. We are using Flink
> >> > Kubernetes Operators. Most probably we are very early adapters for it
> :)
> >> > Let me try it. Get back to you soon.
> >> >
> >> > My plan is building a general purpose CPU and backlog/throughput base
> >> > autoscaling for Flink. I can create a Custom Open Source HPA on top of
> >> your
> >> > changes. Do I need to create a FLIP for it ?
> >> >
> >> > Just general information about us Today we use another execution env.
> >> if
> >> > the Job scheduler does not support autoscaling. Having a HPA works if
> >> your
> >> > sources are well balanced. If there is uneven distribution on sources,
> >> > Having auto scaling feature on scheduler can help better utilization.
> >> But
> >> > this is not urgent. We can start using your PR at least for a while.
> >> >
> >> > Thanks
> >> >
> >> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com>
> >> wrote:
> >> >
> >> >> Hi Talat!
> >> >>
> >> >> One other approach that we are investigating currently is combining
> >> the Flink
> >> >> Kubernetes Operator
> >> >> <
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$
> >
> >> with
> >> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
> >> >>
> >> >> In this approach the HPA monitors the Taskmanager pods directly and
> can
> >> >> modify the FlinkDeployment resource replica number to trigger a
> >> stateful
> >> >> job scale-up/down through the operator.
> >> >> Obviously not as nice as the reactive mode but it works with the
> >> current
> >> >> Kubernetes Native implementation easily. It is also theoretically
> >> possible
> >> >> to integrate this with other custom Flink metrics but we haven't
> >> tested yet.
> >> >>
> >> >> I have a created a POC pull request that showcases these
> capabilities:
> >> >> https://github.com/apache/flink-kubernetes-operator/pull/227
> >> <
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$
> >
> >> >> <
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
> >> >
> >> >>
> >> >> If you are interested it would be nice if you could check it out and
> >> >> provide feedback, we will get back to refining this after our current
> >> >> ongoing release.
> >> >>
> >> >> Cheers,
> >> >> Gyula
> >> >>
> >> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org>
> >> wrote:
> >> >>
> >> >>> Hi Talat,
> >> >>>
> >> >>> This is definitely an interesting and rather complex topic.
> >> >>>
> >> >>> Few unstructured thoughts / notes / questions:
> >> >>>
> >> >>> - The main struggle has always been that it's hard to come up with a
> >> >>> generic one-size-fits-it-all metrics for autoscaling.
> >> >>>   - Flink doesn't have knowledge of the external environment (eg.
> >> >>> capacity
> >> >>> planning on the cluster, no notion of pre-emption), so it can not
> >> really
> >> >>> make a qualified decision in some cases.
> >> >>>   - ^ the above goes along the same reasoning as why we don't
> support
> >> >>> reactive mode with the session cluster (multi-job scheduling)
> >> >>> - The re-scaling decision logic most likely needs to be pluggable
> from
> >> >>> the
> >> >>> above reasons
> >> >>>   - We're in general fairly concerned about running any user code in
> >> JM
> >> >>> for
> >> >>> stability reasons.
> >> >>>   - The most flexible option would be allowing to set the desired
> >> >>> parallelism via rest api and leave the scaling decision to an
> external
> >> >>> process, which could be reused for both standalone and "active"
> >> >>> deployment
> >> >>> modes (there is actually a prototype by Till, that allows this [1])
> >> >>>
> >> >>> How do you intend to make an autoscaling decision? Also note that
> the
> >> >>> re-scaling is still a fairly expensive operation (especially with
> >> large
> >> >>> state), so you need to make sure autoscaler doesn't oscillate and
> >> doesn't
> >> >>> re-scale too often (this is also something that could vary from
> >> workload
> >> >>> to
> >> >>> workload).
> >> >>>
> >> >>> Note on the metrics question with an auto-scaler living in the JM:
> >> >>> - We shouldn't really collect the metrics into the JM, but instead
> JM
> >> can
> >> >>> pull then from TMs directly on-demand (basically the same thing and
> >> >>> external auto-scaler would do).
> >> >>>
> >> >>> Looking forward to your thoughts
> >> >>>
> >> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
> >> <
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$
> >
> >> >>> <
> >>
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
> >> >
> >> >>>
> >> >>> Best,
> >> >>> D.
> >> >>>
> >> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
> >> >>> tuyarer@paloaltonetworks.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Hi,
> >> >>> > I am working on auto scaling support for native deployments. Today
> >> >>> Flink
> >> >>> > provides Reactive mode however it only runs on standalone
> >> deployments.
> >> >>> We
> >> >>> > use Kubernetes native deployment. So I want to increase or
> decrease
> >> job
> >> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
> >> very
> >> >>> > useful to achieve this goal. I started reading code of Flink
> >> >>> JobManager,
> >> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
> >> >>> >
> >> >>> > My assumption is Required Resources will be calculated on
> >> >>> AdaptiveScheduler
> >> >>> > whenever the scheduler receives a heartbeat from a task manager by
> >> >>> calling
> >> >>> > public void updateAccumulators(AccumulatorSnapshot
> >> accumulatorSnapshot)
> >> >>> > method.
> >> >>> >
> >> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
> >> >>> only see
> >> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have
> >> any
> >> >>> > suggestions to collect metrics from TaskManagers ? Should I add
> >> >>> metrics on
> >> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
> >> >>> >
> >> >>> > I am open to another suggestion for this. Whenever I finalize my
> >> >>> > investigation. I will create a FLIP for more detailed
> >> implementation.
> >> >>> >
> >> >>> > Thanks for your help in advance.
> >> >>> > Talat
> >> >>> >
> >> >>>
> >> >>
> >>
> >
>

Re: About Native Deployment's Autoscaling implementation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Yang,

I thought we could enable Adaptive Scheduler, so adding or removing a task
manager is the same as restarting a job when we use an adaptive scheduler.
Do I miss anything ?

Thanks

On Tue, May 24, 2022 at 8:16 PM Yang Wang <da...@gmail.com> wrote:

> Thanks for the interesting discussion.
>
> Compared with reactive mode, leveraging the flink-kubernetes-operator to
> do the job restarting/upgrading is another solution for auto-scaling.
> Given that fully restarting a Flink application on K8s is not too slow,
> this is a reasonable way.
> Really hope we could get some progress in such area.
>
> Best,
> Yang
>
> Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:
>
>> Hi Talat!
>>
>> It would be great to have a HPA that works based on some flink
>> throughput/backlog metrics. I wonder how you are going to access the Flink
>> metrics in the HPA, we might need some integration with the k8s metrics
>> system.
>> In any case whether we need a FLIP or not depends on the complexity, if
>> it's simple then we can go without a FLIP.
>>
>> Cheers,
>> Gyula
>>
>> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
>> tuyarer@paloaltonetworks.com>
>> wrote:
>>
>> > Hi Gyula,
>> >
>> > This seems very promising for initial scaling. We are using Flink
>> > Kubernetes Operators. Most probably we are very early adapters for it :)
>> > Let me try it. Get back to you soon.
>> >
>> > My plan is building a general purpose CPU and backlog/throughput base
>> > autoscaling for Flink. I can create a Custom Open Source HPA on top of
>> your
>> > changes. Do I need to create a FLIP for it ?
>> >
>> > Just general information about us Today we use another execution env.
>> if
>> > the Job scheduler does not support autoscaling. Having a HPA works if
>> your
>> > sources are well balanced. If there is uneven distribution on sources,
>> > Having auto scaling feature on scheduler can help better utilization.
>> But
>> > this is not urgent. We can start using your PR at least for a while.
>> >
>> > Thanks
>> >
>> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com>
>> wrote:
>> >
>> >> Hi Talat!
>> >>
>> >> One other approach that we are investigating currently is combining
>> the Flink
>> >> Kubernetes Operator
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$>
>> with
>> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
>> >>
>> >> In this approach the HPA monitors the Taskmanager pods directly and can
>> >> modify the FlinkDeployment resource replica number to trigger a
>> stateful
>> >> job scale-up/down through the operator.
>> >> Obviously not as nice as the reactive mode but it works with the
>> current
>> >> Kubernetes Native implementation easily. It is also theoretically
>> possible
>> >> to integrate this with other custom Flink metrics but we haven't
>> tested yet.
>> >>
>> >> I have a created a POC pull request that showcases these capabilities:
>> >> https://github.com/apache/flink-kubernetes-operator/pull/227
>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6b-_3CvRI$>
>> >> <
>> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
>> >
>> >>
>> >> If you are interested it would be nice if you could check it out and
>> >> provide feedback, we will get back to refining this after our current
>> >> ongoing release.
>> >>
>> >> Cheers,
>> >> Gyula
>> >>
>> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org>
>> wrote:
>> >>
>> >>> Hi Talat,
>> >>>
>> >>> This is definitely an interesting and rather complex topic.
>> >>>
>> >>> Few unstructured thoughts / notes / questions:
>> >>>
>> >>> - The main struggle has always been that it's hard to come up with a
>> >>> generic one-size-fits-it-all metrics for autoscaling.
>> >>>   - Flink doesn't have knowledge of the external environment (eg.
>> >>> capacity
>> >>> planning on the cluster, no notion of pre-emption), so it can not
>> really
>> >>> make a qualified decision in some cases.
>> >>>   - ^ the above goes along the same reasoning as why we don't support
>> >>> reactive mode with the session cluster (multi-job scheduling)
>> >>> - The re-scaling decision logic most likely needs to be pluggable from
>> >>> the
>> >>> above reasons
>> >>>   - We're in general fairly concerned about running any user code in
>> JM
>> >>> for
>> >>> stability reasons.
>> >>>   - The most flexible option would be allowing to set the desired
>> >>> parallelism via rest api and leave the scaling decision to an external
>> >>> process, which could be reused for both standalone and "active"
>> >>> deployment
>> >>> modes (there is actually a prototype by Till, that allows this [1])
>> >>>
>> >>> How do you intend to make an autoscaling decision? Also note that the
>> >>> re-scaling is still a fairly expensive operation (especially with
>> large
>> >>> state), so you need to make sure autoscaler doesn't oscillate and
>> doesn't
>> >>> re-scale too often (this is also something that could vary from
>> workload
>> >>> to
>> >>> workload).
>> >>>
>> >>> Note on the metrics question with an auto-scaler living in the JM:
>> >>> - We shouldn't really collect the metrics into the JM, but instead JM
>> can
>> >>> pull then from TMs directly on-demand (basically the same thing and
>> >>> external auto-scaler would do).
>> >>>
>> >>> Looking forward to your thoughts
>> >>>
>> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!fRMcRbMNo0XDxKDkicgzx2z_yTAz5ma2xADfvhHG6fJowdZ-vvzDrawMA5VmsD2W8BCo-5SA5FfWTiAnkw6bc0Id0Zg$>
>> >>> <
>> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
>> >
>> >>>
>> >>> Best,
>> >>> D.
>> >>>
>> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> >>> tuyarer@paloaltonetworks.com>
>> >>> wrote:
>> >>>
>> >>> > Hi,
>> >>> > I am working on auto scaling support for native deployments. Today
>> >>> Flink
>> >>> > provides Reactive mode however it only runs on standalone
>> deployments.
>> >>> We
>> >>> > use Kubernetes native deployment. So I want to increase or decrease
>> job
>> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
>> very
>> >>> > useful to achieve this goal. I started reading code of Flink
>> >>> JobManager,
>> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >>> >
>> >>> > My assumption is Required Resources will be calculated on
>> >>> AdaptiveScheduler
>> >>> > whenever the scheduler receives a heartbeat from a task manager by
>> >>> calling
>> >>> > public void updateAccumulators(AccumulatorSnapshot
>> accumulatorSnapshot)
>> >>> > method.
>> >>> >
>> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
>> >>> only see
>> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have
>> any
>> >>> > suggestions to collect metrics from TaskManagers ? Should I add
>> >>> metrics on
>> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >>> >
>> >>> > I am open to another suggestion for this. Whenever I finalize my
>> >>> > investigation. I will create a FLIP for more detailed
>> implementation.
>> >>> >
>> >>> > Thanks for your help in advance.
>> >>> > Talat
>> >>> >
>> >>>
>> >>
>>
>

Re: About Native Deployment's Autoscaling implementation

Posted by Yang Wang <da...@gmail.com>.
Thanks for the interesting discussion.

Compared with reactive mode, leveraging the flink-kubernetes-operator to do
the job restarting/upgrading is another solution for auto-scaling.
Given that fully restarting a Flink application on K8s is not too slow,
this is a reasonable way.
Really hope we could get some progress in such area.

Best,
Yang

Gyula Fóra <gy...@gmail.com> 于2022年5月25日周三 09:04写道:

> Hi Talat!
>
> It would be great to have a HPA that works based on some flink
> throughput/backlog metrics. I wonder how you are going to access the Flink
> metrics in the HPA, we might need some integration with the k8s metrics
> system.
> In any case whether we need a FLIP or not depends on the complexity, if
> it's simple then we can go without a FLIP.
>
> Cheers,
> Gyula
>
> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
> tuyarer@paloaltonetworks.com>
> wrote:
>
> > Hi Gyula,
> >
> > This seems very promising for initial scaling. We are using Flink
> > Kubernetes Operators. Most probably we are very early adapters for it :)
> > Let me try it. Get back to you soon.
> >
> > My plan is building a general purpose CPU and backlog/throughput base
> > autoscaling for Flink. I can create a Custom Open Source HPA on top of
> your
> > changes. Do I need to create a FLIP for it ?
> >
> > Just general information about us Today we use another execution env.  if
> > the Job scheduler does not support autoscaling. Having a HPA works if
> your
> > sources are well balanced. If there is uneven distribution on sources,
> > Having auto scaling feature on scheduler can help better utilization. But
> > this is not urgent. We can start using your PR at least for a while.
> >
> > Thanks
> >
> > On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com> wrote:
> >
> >> Hi Talat!
> >>
> >> One other approach that we are investigating currently is combining the
> Flink
> >> Kubernetes Operator
> >> <
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$>
> with
> >> the K8S scaling capabilities (Horizontal Pod autoscaler)
> >>
> >> In this approach the HPA monitors the Taskmanager pods directly and can
> >> modify the FlinkDeployment resource replica number to trigger a stateful
> >> job scale-up/down through the operator.
> >> Obviously not as nice as the reactive mode but it works with the current
> >> Kubernetes Native implementation easily. It is also theoretically
> possible
> >> to integrate this with other custom Flink metrics but we haven't tested
> yet.
> >>
> >> I have a created a POC pull request that showcases these capabilities:
> >> https://github.com/apache/flink-kubernetes-operator/pull/227
> >> <
> https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$
> >
> >>
> >> If you are interested it would be nice if you could check it out and
> >> provide feedback, we will get back to refining this after our current
> >> ongoing release.
> >>
> >> Cheers,
> >> Gyula
> >>
> >> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org> wrote:
> >>
> >>> Hi Talat,
> >>>
> >>> This is definitely an interesting and rather complex topic.
> >>>
> >>> Few unstructured thoughts / notes / questions:
> >>>
> >>> - The main struggle has always been that it's hard to come up with a
> >>> generic one-size-fits-it-all metrics for autoscaling.
> >>>   - Flink doesn't have knowledge of the external environment (eg.
> >>> capacity
> >>> planning on the cluster, no notion of pre-emption), so it can not
> really
> >>> make a qualified decision in some cases.
> >>>   - ^ the above goes along the same reasoning as why we don't support
> >>> reactive mode with the session cluster (multi-job scheduling)
> >>> - The re-scaling decision logic most likely needs to be pluggable from
> >>> the
> >>> above reasons
> >>>   - We're in general fairly concerned about running any user code in JM
> >>> for
> >>> stability reasons.
> >>>   - The most flexible option would be allowing to set the desired
> >>> parallelism via rest api and leave the scaling decision to an external
> >>> process, which could be reused for both standalone and "active"
> >>> deployment
> >>> modes (there is actually a prototype by Till, that allows this [1])
> >>>
> >>> How do you intend to make an autoscaling decision? Also note that the
> >>> re-scaling is still a fairly expensive operation (especially with large
> >>> state), so you need to make sure autoscaler doesn't oscillate and
> doesn't
> >>> re-scale too often (this is also something that could vary from
> workload
> >>> to
> >>> workload).
> >>>
> >>> Note on the metrics question with an auto-scaler living in the JM:
> >>> - We shouldn't really collect the metrics into the JM, but instead JM
> can
> >>> pull then from TMs directly on-demand (basically the same thing and
> >>> external auto-scaler would do).
> >>>
> >>> Looking forward to your thoughts
> >>>
> >>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
> >>> <
> https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$
> >
> >>>
> >>> Best,
> >>> D.
> >>>
> >>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
> >>> tuyarer@paloaltonetworks.com>
> >>> wrote:
> >>>
> >>> > Hi,
> >>> > I am working on auto scaling support for native deployments. Today
> >>> Flink
> >>> > provides Reactive mode however it only runs on standalone
> deployments.
> >>> We
> >>> > use Kubernetes native deployment. So I want to increase or decrease
> job
> >>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are
> very
> >>> > useful to achieve this goal. I started reading code of Flink
> >>> JobManager,
> >>> > AdaptiveScheduler and DeclarativeSlotPool etc.
> >>> >
> >>> > My assumption is Required Resources will be calculated on
> >>> AdaptiveScheduler
> >>> > whenever the scheduler receives a heartbeat from a task manager by
> >>> calling
> >>> > public void updateAccumulators(AccumulatorSnapshot
> accumulatorSnapshot)
> >>> > method.
> >>> >
> >>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
> >>> only see
> >>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
> >>> > suggestions to collect metrics from TaskManagers ? Should I add
> >>> metrics on
> >>> > TaskExecutorToJobManagerHeartbeatPayload ?
> >>> >
> >>> > I am open to another suggestion for this. Whenever I finalize my
> >>> > investigation. I will create a FLIP for more detailed implementation.
> >>> >
> >>> > Thanks for your help in advance.
> >>> > Talat
> >>> >
> >>>
> >>
>

Re: About Native Deployment's Autoscaling implementation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Gyula,

I did some investigation. Kubernetes developers suggest not using the
kubernetes metric system for application specific metrics. [1] Currently
only possible workflow is over prometheus. Prometheus is used widely on
Kubernetes deployments. Kubernetes metrics provides Custom Metrics API,
each task manager exposes its own metrics and HPA uses those metrics to
make autoscaling decisions.
Google Dataflow uses Source metrics [2] We need to get maxReplica count for
a Flink Job. I drow in general workflow in here [3]

What are your thoughts on this?

Talat

[1] https://github.com/kubernetes-sigs/metrics-server#use-cases
[2]
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#custom-unbounded-sources
[3]
https://docs.google.com/drawings/d/1m_SjabXN-EX-0R9zbbS9cXxH3JV2r257K6aRG3Zyd4Q/edit?usp=sharing

On Tue, May 24, 2022 at 6:02 PM Gyula Fóra <gy...@gmail.com> wrote:

> Hi Talat!
>
> It would be great to have a HPA that works based on some flink
> throughput/backlog metrics. I wonder how you are going to access the Flink
> metrics in the HPA, we might need some integration with the k8s metrics
> system.
> In any case whether we need a FLIP or not depends on the complexity, if
> it's simple then we can go without a FLIP.
>
> Cheers,
> Gyula
>
> On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <
> tuyarer@paloaltonetworks.com> wrote:
>
>> Hi Gyula,
>>
>> This seems very promising for initial scaling. We are using Flink
>> Kubernetes Operators. Most probably we are very early adapters for it :)
>> Let me try it. Get back to you soon.
>>
>> My plan is building a general purpose CPU and backlog/throughput base
>> autoscaling for Flink. I can create a Custom Open Source HPA on top of your
>> changes. Do I need to create a FLIP for it ?
>>
>> Just general information about us Today we use another execution env.  if
>> the Job scheduler does not support autoscaling. Having a HPA works if your
>> sources are well balanced. If there is uneven distribution on sources,
>> Having auto scaling feature on scheduler can help better utilization. But
>> this is not urgent. We can start using your PR at least for a while.
>>
>> Thanks
>>
>> On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com> wrote:
>>
>>> Hi Talat!
>>>
>>> One other approach that we are investigating currently is combining the Flink
>>> Kubernetes Operator
>>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> with
>>> the K8S scaling capabilities (Horizontal Pod autoscaler)
>>>
>>> In this approach the HPA monitors the Taskmanager pods directly and can
>>> modify the FlinkDeployment resource replica number to trigger a stateful
>>> job scale-up/down through the operator.
>>> Obviously not as nice as the reactive mode but it works with the current
>>> Kubernetes Native implementation easily. It is also theoretically possible
>>> to integrate this with other custom Flink metrics but we haven't tested yet.
>>>
>>> I have a created a POC pull request that showcases these capabilities:
>>> https://github.com/apache/flink-kubernetes-operator/pull/227
>>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$>
>>>
>>> If you are interested it would be nice if you could check it out and
>>> provide feedback, we will get back to refining this after our current
>>> ongoing release.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org> wrote:
>>>
>>>> Hi Talat,
>>>>
>>>> This is definitely an interesting and rather complex topic.
>>>>
>>>> Few unstructured thoughts / notes / questions:
>>>>
>>>> - The main struggle has always been that it's hard to come up with a
>>>> generic one-size-fits-it-all metrics for autoscaling.
>>>>   - Flink doesn't have knowledge of the external environment (eg.
>>>> capacity
>>>> planning on the cluster, no notion of pre-emption), so it can not really
>>>> make a qualified decision in some cases.
>>>>   - ^ the above goes along the same reasoning as why we don't support
>>>> reactive mode with the session cluster (multi-job scheduling)
>>>> - The re-scaling decision logic most likely needs to be pluggable from
>>>> the
>>>> above reasons
>>>>   - We're in general fairly concerned about running any user code in JM
>>>> for
>>>> stability reasons.
>>>>   - The most flexible option would be allowing to set the desired
>>>> parallelism via rest api and leave the scaling decision to an external
>>>> process, which could be reused for both standalone and "active"
>>>> deployment
>>>> modes (there is actually a prototype by Till, that allows this [1])
>>>>
>>>> How do you intend to make an autoscaling decision? Also note that the
>>>> re-scaling is still a fairly expensive operation (especially with large
>>>> state), so you need to make sure autoscaler doesn't oscillate and
>>>> doesn't
>>>> re-scale too often (this is also something that could vary from
>>>> workload to
>>>> workload).
>>>>
>>>> Note on the metrics question with an auto-scaler living in the JM:
>>>> - We shouldn't really collect the metrics into the JM, but instead JM
>>>> can
>>>> pull then from TMs directly on-demand (basically the same thing and
>>>> external auto-scaler would do).
>>>>
>>>> Looking forward to your thoughts
>>>>
>>>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>>>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$>
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>>>> tuyarer@paloaltonetworks.com>
>>>> wrote:
>>>>
>>>> > Hi,
>>>> > I am working on auto scaling support for native deployments. Today
>>>> Flink
>>>> > provides Reactive mode however it only runs on standalone
>>>> deployments. We
>>>> > use Kubernetes native deployment. So I want to increase or decrease
>>>> job
>>>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
>>>> > useful to achieve this goal. I started reading code of Flink
>>>> JobManager,
>>>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>>>> >
>>>> > My assumption is Required Resources will be calculated on
>>>> AdaptiveScheduler
>>>> > whenever the scheduler receives a heartbeat from a task manager by
>>>> calling
>>>> > public void updateAccumulators(AccumulatorSnapshot
>>>> accumulatorSnapshot)
>>>> > method.
>>>> >
>>>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
>>>> only see
>>>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
>>>> > suggestions to collect metrics from TaskManagers ? Should I add
>>>> metrics on
>>>> > TaskExecutorToJobManagerHeartbeatPayload ?
>>>> >
>>>> > I am open to another suggestion for this. Whenever I finalize my
>>>> > investigation. I will create a FLIP for more detailed implementation.
>>>> >
>>>> > Thanks for your help in advance.
>>>> > Talat
>>>> >
>>>>
>>>

Re: About Native Deployment's Autoscaling implementation

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Talat!

It would be great to have a HPA that works based on some flink
throughput/backlog metrics. I wonder how you are going to access the Flink
metrics in the HPA, we might need some integration with the k8s metrics
system.
In any case whether we need a FLIP or not depends on the complexity, if
it's simple then we can go without a FLIP.

Cheers,
Gyula

On Tue, May 24, 2022 at 12:26 PM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi Gyula,
>
> This seems very promising for initial scaling. We are using Flink
> Kubernetes Operators. Most probably we are very early adapters for it :)
> Let me try it. Get back to you soon.
>
> My plan is building a general purpose CPU and backlog/throughput base
> autoscaling for Flink. I can create a Custom Open Source HPA on top of your
> changes. Do I need to create a FLIP for it ?
>
> Just general information about us Today we use another execution env.  if
> the Job scheduler does not support autoscaling. Having a HPA works if your
> sources are well balanced. If there is uneven distribution on sources,
> Having auto scaling feature on scheduler can help better utilization. But
> this is not urgent. We can start using your PR at least for a while.
>
> Thanks
>
> On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com> wrote:
>
>> Hi Talat!
>>
>> One other approach that we are investigating currently is combining the Flink
>> Kubernetes Operator
>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> with
>> the K8S scaling capabilities (Horizontal Pod autoscaler)
>>
>> In this approach the HPA monitors the Taskmanager pods directly and can
>> modify the FlinkDeployment resource replica number to trigger a stateful
>> job scale-up/down through the operator.
>> Obviously not as nice as the reactive mode but it works with the current
>> Kubernetes Native implementation easily. It is also theoretically possible
>> to integrate this with other custom Flink metrics but we haven't tested yet.
>>
>> I have a created a POC pull request that showcases these capabilities:
>> https://github.com/apache/flink-kubernetes-operator/pull/227
>> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$>
>>
>> If you are interested it would be nice if you could check it out and
>> provide feedback, we will get back to refining this after our current
>> ongoing release.
>>
>> Cheers,
>> Gyula
>>
>> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org> wrote:
>>
>>> Hi Talat,
>>>
>>> This is definitely an interesting and rather complex topic.
>>>
>>> Few unstructured thoughts / notes / questions:
>>>
>>> - The main struggle has always been that it's hard to come up with a
>>> generic one-size-fits-it-all metrics for autoscaling.
>>>   - Flink doesn't have knowledge of the external environment (eg.
>>> capacity
>>> planning on the cluster, no notion of pre-emption), so it can not really
>>> make a qualified decision in some cases.
>>>   - ^ the above goes along the same reasoning as why we don't support
>>> reactive mode with the session cluster (multi-job scheduling)
>>> - The re-scaling decision logic most likely needs to be pluggable from
>>> the
>>> above reasons
>>>   - We're in general fairly concerned about running any user code in JM
>>> for
>>> stability reasons.
>>>   - The most flexible option would be allowing to set the desired
>>> parallelism via rest api and leave the scaling decision to an external
>>> process, which could be reused for both standalone and "active"
>>> deployment
>>> modes (there is actually a prototype by Till, that allows this [1])
>>>
>>> How do you intend to make an autoscaling decision? Also note that the
>>> re-scaling is still a fairly expensive operation (especially with large
>>> state), so you need to make sure autoscaler doesn't oscillate and doesn't
>>> re-scale too often (this is also something that could vary from workload
>>> to
>>> workload).
>>>
>>> Note on the metrics question with an auto-scaler living in the JM:
>>> - We shouldn't really collect the metrics into the JM, but instead JM can
>>> pull then from TMs directly on-demand (basically the same thing and
>>> external auto-scaler would do).
>>>
>>> Looking forward to your thoughts
>>>
>>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$>
>>>
>>> Best,
>>> D.
>>>
>>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>>> tuyarer@paloaltonetworks.com>
>>> wrote:
>>>
>>> > Hi,
>>> > I am working on auto scaling support for native deployments. Today
>>> Flink
>>> > provides Reactive mode however it only runs on standalone deployments.
>>> We
>>> > use Kubernetes native deployment. So I want to increase or decrease job
>>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
>>> > useful to achieve this goal. I started reading code of Flink
>>> JobManager,
>>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>>> >
>>> > My assumption is Required Resources will be calculated on
>>> AdaptiveScheduler
>>> > whenever the scheduler receives a heartbeat from a task manager by
>>> calling
>>> > public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
>>> > method.
>>> >
>>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I
>>> only see
>>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
>>> > suggestions to collect metrics from TaskManagers ? Should I add
>>> metrics on
>>> > TaskExecutorToJobManagerHeartbeatPayload ?
>>> >
>>> > I am open to another suggestion for this. Whenever I finalize my
>>> > investigation. I will create a FLIP for more detailed implementation.
>>> >
>>> > Thanks for your help in advance.
>>> > Talat
>>> >
>>>
>>

Re: About Native Deployment's Autoscaling implementation

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi Gyula,

This seems very promising for initial scaling. We are using Flink
Kubernetes Operators. Most probably we are very early adapters for it :)
Let me try it. Get back to you soon.

My plan is building a general purpose CPU and backlog/throughput base
autoscaling for Flink. I can create a Custom Open Source HPA on top of your
changes. Do I need to create a FLIP for it ?

Just general information about us Today we use another execution env.  if
the Job scheduler does not support autoscaling. Having a HPA works if your
sources are well balanced. If there is uneven distribution on sources,
Having auto scaling feature on scheduler can help better utilization. But
this is not urgent. We can start using your PR at least for a while.

Thanks

On Mon, May 23, 2022 at 4:10 AM Gyula Fóra <gy...@gmail.com> wrote:

> Hi Talat!
>
> One other approach that we are investigating currently is combining the Flink
> Kubernetes Operator
> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nZZNwfcA$> with
> the K8S scaling capabilities (Horizontal Pod autoscaler)
>
> In this approach the HPA monitors the Taskmanager pods directly and can
> modify the FlinkDeployment resource replica number to trigger a stateful
> job scale-up/down through the operator.
> Obviously not as nice as the reactive mode but it works with the current
> Kubernetes Native implementation easily. It is also theoretically possible
> to integrate this with other custom Flink metrics but we haven't tested yet.
>
> I have a created a POC pull request that showcases these capabilities:
> https://github.com/apache/flink-kubernetes-operator/pull/227
> <https://urldefense.com/v3/__https://github.com/apache/flink-kubernetes-operator/pull/227__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9nKHxgshA$>
>
> If you are interested it would be nice if you could check it out and
> provide feedback, we will get back to refining this after our current
> ongoing release.
>
> Cheers,
> Gyula
>
> On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Talat,
>>
>> This is definitely an interesting and rather complex topic.
>>
>> Few unstructured thoughts / notes / questions:
>>
>> - The main struggle has always been that it's hard to come up with a
>> generic one-size-fits-it-all metrics for autoscaling.
>>   - Flink doesn't have knowledge of the external environment (eg. capacity
>> planning on the cluster, no notion of pre-emption), so it can not really
>> make a qualified decision in some cases.
>>   - ^ the above goes along the same reasoning as why we don't support
>> reactive mode with the session cluster (multi-job scheduling)
>> - The re-scaling decision logic most likely needs to be pluggable from the
>> above reasons
>>   - We're in general fairly concerned about running any user code in JM
>> for
>> stability reasons.
>>   - The most flexible option would be allowing to set the desired
>> parallelism via rest api and leave the scaling decision to an external
>> process, which could be reused for both standalone and "active" deployment
>> modes (there is actually a prototype by Till, that allows this [1])
>>
>> How do you intend to make an autoscaling decision? Also note that the
>> re-scaling is still a fairly expensive operation (especially with large
>> state), so you need to make sure autoscaler doesn't oscillate and doesn't
>> re-scale too often (this is also something that could vary from workload
>> to
>> workload).
>>
>> Note on the metrics question with an auto-scaler living in the JM:
>> - We shouldn't really collect the metrics into the JM, but instead JM can
>> pull then from TMs directly on-demand (basically the same thing and
>> external auto-scaler would do).
>>
>> Looking forward to your thoughts
>>
>> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>> <https://urldefense.com/v3/__https://github.com/tillrohrmann/flink/commits/autoscaling__;!!Mt_FR42WkD9csi9Y!ZNBiCduZFUmuQI7_9M48gQnkxBkrLEVOIPRWZY0ad0xmltbQ6G2stlfsiw6q9bGi5fctVF2RS1YNL5EkV9khvi_Fng$>
>>
>> Best,
>> D.
>>
>> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <
>> tuyarer@paloaltonetworks.com>
>> wrote:
>>
>> > Hi,
>> > I am working on auto scaling support for native deployments. Today Flink
>> > provides Reactive mode however it only runs on standalone deployments.
>> We
>> > use Kubernetes native deployment. So I want to increase or decrease job
>> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
>> > useful to achieve this goal. I started reading code of Flink JobManager,
>> > AdaptiveScheduler and DeclarativeSlotPool etc.
>> >
>> > My assumption is Required Resources will be calculated on
>> AdaptiveScheduler
>> > whenever the scheduler receives a heartbeat from a task manager by
>> calling
>> > public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
>> > method.
>> >
>> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I only
>> see
>> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
>> > suggestions to collect metrics from TaskManagers ? Should I add metrics
>> on
>> > TaskExecutorToJobManagerHeartbeatPayload ?
>> >
>> > I am open to another suggestion for this. Whenever I finalize my
>> > investigation. I will create a FLIP for more detailed implementation.
>> >
>> > Thanks for your help in advance.
>> > Talat
>> >
>>
>

Re: About Native Deployment's Autoscaling implementation

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Talat!

One other approach that we are investigating currently is combining the Flink
Kubernetes Operator <https://github.com/apache/flink-kubernetes-operator> with
the K8S scaling capabilities (Horizontal Pod autoscaler)

In this approach the HPA monitors the Taskmanager pods directly and can
modify the FlinkDeployment resource replica number to trigger a stateful
job scale-up/down through the operator.
Obviously not as nice as the reactive mode but it works with the current
Kubernetes Native implementation easily. It is also theoretically possible
to integrate this with other custom Flink metrics but we haven't tested yet.

I have a created a POC pull request that showcases these capabilities:
https://github.com/apache/flink-kubernetes-operator/pull/227

If you are interested it would be nice if you could check it out and
provide feedback, we will get back to refining this after our current
ongoing release.

Cheers,
Gyula

On Mon, May 23, 2022 at 12:23 AM David Morávek <dm...@apache.org> wrote:

> Hi Talat,
>
> This is definitely an interesting and rather complex topic.
>
> Few unstructured thoughts / notes / questions:
>
> - The main struggle has always been that it's hard to come up with a
> generic one-size-fits-it-all metrics for autoscaling.
>   - Flink doesn't have knowledge of the external environment (eg. capacity
> planning on the cluster, no notion of pre-emption), so it can not really
> make a qualified decision in some cases.
>   - ^ the above goes along the same reasoning as why we don't support
> reactive mode with the session cluster (multi-job scheduling)
> - The re-scaling decision logic most likely needs to be pluggable from the
> above reasons
>   - We're in general fairly concerned about running any user code in JM for
> stability reasons.
>   - The most flexible option would be allowing to set the desired
> parallelism via rest api and leave the scaling decision to an external
> process, which could be reused for both standalone and "active" deployment
> modes (there is actually a prototype by Till, that allows this [1])
>
> How do you intend to make an autoscaling decision? Also note that the
> re-scaling is still a fairly expensive operation (especially with large
> state), so you need to make sure autoscaler doesn't oscillate and doesn't
> re-scale too often (this is also something that could vary from workload to
> workload).
>
> Note on the metrics question with an auto-scaler living in the JM:
> - We shouldn't really collect the metrics into the JM, but instead JM can
> pull then from TMs directly on-demand (basically the same thing and
> external auto-scaler would do).
>
> Looking forward to your thoughts
>
> [1] https://github.com/tillrohrmann/flink/commits/autoscaling
>
> Best,
> D.
>
> On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <tuyarer@paloaltonetworks.com
> >
> wrote:
>
> > Hi,
> > I am working on auto scaling support for native deployments. Today Flink
> > provides Reactive mode however it only runs on standalone deployments. We
> > use Kubernetes native deployment. So I want to increase or decrease job
> > resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
> > useful to achieve this goal. I started reading code of Flink JobManager,
> > AdaptiveScheduler and DeclarativeSlotPool etc.
> >
> > My assumption is Required Resources will be calculated on
> AdaptiveScheduler
> > whenever the scheduler receives a heartbeat from a task manager by
> calling
> > public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
> > method.
> >
> > I checked TaskExecutorToJobManagerHeartbeatPayload class however I only
> see
> > *accumulatorReport* and *executionDeploymentReport* . Do you have any
> > suggestions to collect metrics from TaskManagers ? Should I add metrics
> on
> > TaskExecutorToJobManagerHeartbeatPayload ?
> >
> > I am open to another suggestion for this. Whenever I finalize my
> > investigation. I will create a FLIP for more detailed implementation.
> >
> > Thanks for your help in advance.
> > Talat
> >
>

Re: About Native Deployment's Autoscaling implementation

Posted by David Morávek <dm...@apache.org>.
Hi Talat,

This is definitely an interesting and rather complex topic.

Few unstructured thoughts / notes / questions:

- The main struggle has always been that it's hard to come up with a
generic one-size-fits-it-all metrics for autoscaling.
  - Flink doesn't have knowledge of the external environment (eg. capacity
planning on the cluster, no notion of pre-emption), so it can not really
make a qualified decision in some cases.
  - ^ the above goes along the same reasoning as why we don't support
reactive mode with the session cluster (multi-job scheduling)
- The re-scaling decision logic most likely needs to be pluggable from the
above reasons
  - We're in general fairly concerned about running any user code in JM for
stability reasons.
  - The most flexible option would be allowing to set the desired
parallelism via rest api and leave the scaling decision to an external
process, which could be reused for both standalone and "active" deployment
modes (there is actually a prototype by Till, that allows this [1])

How do you intend to make an autoscaling decision? Also note that the
re-scaling is still a fairly expensive operation (especially with large
state), so you need to make sure autoscaler doesn't oscillate and doesn't
re-scale too often (this is also something that could vary from workload to
workload).

Note on the metrics question with an auto-scaler living in the JM:
- We shouldn't really collect the metrics into the JM, but instead JM can
pull then from TMs directly on-demand (basically the same thing and
external auto-scaler would do).

Looking forward to your thoughts

[1] https://github.com/tillrohrmann/flink/commits/autoscaling

Best,
D.

On Mon, May 23, 2022 at 8:32 AM Talat Uyarer <tu...@paloaltonetworks.com>
wrote:

> Hi,
> I am working on auto scaling support for native deployments. Today Flink
> provides Reactive mode however it only runs on standalone deployments. We
> use Kubernetes native deployment. So I want to increase or decrease job
> resources for our streamin jobs. Recent Flip-138 and Flip-160 are very
> useful to achieve this goal. I started reading code of Flink JobManager,
> AdaptiveScheduler and DeclarativeSlotPool etc.
>
> My assumption is Required Resources will be calculated on AdaptiveScheduler
> whenever the scheduler receives a heartbeat from a task manager by calling
> public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot)
> method.
>
> I checked TaskExecutorToJobManagerHeartbeatPayload class however I only see
> *accumulatorReport* and *executionDeploymentReport* . Do you have any
> suggestions to collect metrics from TaskManagers ? Should I add metrics on
> TaskExecutorToJobManagerHeartbeatPayload ?
>
> I am open to another suggestion for this. Whenever I finalize my
> investigation. I will create a FLIP for more detailed implementation.
>
> Thanks for your help in advance.
> Talat
>