You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by KristoffSC <kr...@gmail.com> on 2020/01/08 17:06:17 UTC

Flink Job claster scalability

Hi all,
I must say I'm very impressed by Flink and what it can do.

I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject. 

My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
following [1]
3. env setup:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.setMaxParallelism(128);
        env.enableCheckpointing(10 * 60 * 1000);

Please mind that I am using operator chaining here. 

My pipeline setup:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png> 


As you can see I have 7 operators (few of them were actually chained and
this is ok), with different parallelism level. This all gives me 23 tasks
total. 


I've noticed that with "one task manager = one task slot" approach I have to
have 6 task slots/task managers to be able to start this pipeline.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png> 

If number of task slots is lower than 6, job is scheduled but not started. 

With 6 task slots everything is working fine and I've must say that I'm very
impressed with a way that Flinks balanced data between task slots. Data was
distributed very evenly between operator instances/tasks. 

In this setup (7 operators, 23 tasks and 6 task slots), some task slots have
to be reused by more than one operator. While inspecting UI I've found
examples such operators. This is what I was expecting though.

However I was surprised a little bit after I added one additional task
manager (hence one new task slot)

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png> 

After adding new resources, Flink did not re balanced/redistributed the
graph. So this host was sitting there and doing nothing. Even after putting
some load on the cluster, still this node was not used.

 
*After doing this exercise I have few questions:*

1. It seems that number of task slots must be equal or greater than max
number of parallelism used in the pipeline. In my case it was 6. When I
changed parallelism for one of the operator to 7, I had to have 7 task slots
(task managers in my setup) to be able to even start the job. 
Is this the case?

2. What I can do to use the extra node that was spanned while job was
running?
In other words, If I would see that one of my nodes has to much load what I
can do? Please mind that I'm using keyBy/hashing function in my pipeline and
in my tests I had around 5000 unique keys.

I've try to use REST API to call "rescale" but I got this response:
/302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/

Thanks.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Job claster scalability

Posted by Yangze Guo <ka...@gmail.com>.
Hi KristoffSC

As Zhu said, Flink enables slot sharing[1] by default. This feature is
nothing to do with the resource of your cluster. The benefit of this
feature is written in [1] as well. I mean, it will not detect how many
slots in your cluster and adjust its behavior toward this number. If
you want to make the best use of your cluster, you can increase the
parallelism of the vertex that has the largest parallelism or
"disable" the slot sharing by [2]. IMO, the first way matches your
purpose.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#task-chaining-and-resource-groups

Best,
Yangze Guo

On Fri, Jan 10, 2020 at 6:49 PM KristoffSC
<kr...@gmail.com> wrote:
>
> Hi Zhu Zhu,
> well In my last test I did not change the job config, so I did not change
> the parallelism level of any operator and I did not change policy regarding
> slot sharing (it stays as default one). Operator Chaining is set to true
> without any extra actions like "start new chain, disable chain etc"
>
> What I assume however is that Flink will try find most efficient way to use
> available resources during job submission.
>
> In the first case, where I had only 6 task managers (which matches max
> parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task
> slots did was not effective because reason described by David. This is
> understandable.
>
> However, I was assuming that if I submit my job on a cluster that have more
> task managers than 6, Flink will not share task slots by default. That did
> not happen. Flink deployed the job in the same way regardless of extra
> resources.
>
>
> So the conclusion is that simple job resubmitting will not work in this case
> and actually I cant have any certainty that it will. Since in my case Flink
> still reuses slot task.
>
> If this would be the production case, I would have to do a test job
> submission on testing env and potentially change the job. Not the config,
> but adding  slot sharing groups etc.
> So if this would be the production case I will not be able to react fast, I
> would have to deploy new version of my app/job which could be problematic.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Job claster scalability

Posted by KristoffSC <kr...@gmail.com>.
Hi Zhu Zhu,
well In my last test I did not change the job config, so I did not change
the parallelism level of any operator and I did not change policy regarding
slot sharing (it stays as default one). Operator Chaining is set to true
without any extra actions like "start new chain, disable chain etc"

What I assume however is that Flink will try find most efficient way to use
available resources during job submission. 

In the first case, where I had only 6 task managers (which matches max
parallelism of my JobVertex), Flink reused some TaskSlots. Adding extra task
slots did was not effective because reason described by David. This is
understandable.

However, I was assuming that if I submit my job on a cluster that have more
task managers than 6, Flink will not share task slots by default. That did
not happen. Flink deployed the job in the same way regardless of extra
resources.


So the conclusion is that simple job resubmitting will not work in this case
and actually I cant have any certainty that it will. Since in my case Flink
still reuses slot task.

If this would be the production case, I would have to do a test job
submission on testing env and potentially change the job. Not the config,
but adding  slot sharing groups etc. 
So if this would be the production case I will not be able to react fast, I
would have to deploy new version of my app/job which could be problematic. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Job claster scalability

Posted by Zhu Zhu <re...@gmail.com>.
Hi KristoffSC,

Did you increase the parallelism of the vertex that has the largest
parallelism?
Or did you explicitly set tasks to be in different slot sharing group?
With the default slot sharing, the number of slots required/used equals to
the max parallelism of a JobVertex, which is 6 in your case.

KristoffSC <kr...@gmail.com> 于2020年1月9日周四 下午9:26写道:

> Thank you David and Zhu Zhu,
> this helps a lot.
>
> I have follow up questions though.
>
> Having this
> /"Instead the Job must be stopped via a savepoint and restarted with a new
> parallelism"/
>
> and slot sharing [1] feature, I got the impression that if I would start my
> cluster with more than 6 task slots, Flink will try deploy tasks across all
> resources, trying to use all available resources during job submission
>
> I did a two tests having my original task.
> 1. I started a Job Cluster with 7 task slots (7 task manager since in this
> case 1 task manager has one task slot).
> 2. I started a Session cluster with 28 task slots in total. In this case I
> had 7 task managers, 4 task slot each.
>
> For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
> submitted my job from UI after Flink started to be operative.
>
>
> For both cases it used only 6 task slots, so it was still reusing task
> slots. I got the impression that it will try to use as much available
> resources as it can.
>
> What do you think about this?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Flink Job claster scalability

Posted by KristoffSC <kr...@gmail.com>.
Thank you David and Zhu Zhu,
this helps a lot.

I have follow up questions though.

Having this
/"Instead the Job must be stopped via a savepoint and restarted with a new
parallelism"/

and slot sharing [1] feature, I got the impression that if I would start my
cluster with more than 6 task slots, Flink will try deploy tasks across all
resources, trying to use all available resources during job submission

I did a two tests having my original task.
1. I started a Job Cluster with 7 task slots (7 task manager since in this
case 1 task manager has one task slot).
2. I started a Session cluster with 28 task slots in total. In this case I
had 7 task managers, 4 task slot each. 

For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
submitted my job from UI after Flink started to be operative. 


For both cases it used only 6 task slots, so it was still reusing task
slots. I got the impression that it will try to use as much available
resources as it can.

What do you think about this?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Job claster scalability

Posted by David Maddison <ma...@gmail.com>.
Hi KristoffSC,

As Zhu Zhu explained, Flink does not currently auto-scale a Job as new
resources become available. Instead the Job must be stopped via a savepoint
and restarted with a new parallelism (the old rescale CLI experiment use to
perform this).

Making Flink reactive to new resources and auto scaling jobs is something
I'm currently very interested in. An approach on how to change Flink to
support this has been previously outlined/discussed in FLINK-10407 (
https://issues.apache.org/jira/browse/FLINK-10407)

/David/

On Thu, Jan 9, 2020 at 7:38 AM Zhu Zhu <re...@gmail.com> wrote:

> Hi KristoffSC,
>
> Each task needs a slot to run. However, Flink enables slot sharing[1] by
> default so that one slot can host one parallel instance of each task in a
> job. That's why your job can start with 6 slots.
> However, different parallel instances of the same task cannot share a
> slot. That's why you need at least 6 slots to run your job.
>
> You can set tasks to be in different slot sharing group via
> '.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
> allows the tasks to not burden each other. However, in this way the job
> will need more slots to start.
>
> So for your questions:
> #1 yes
> #2 ATM, you will need to resubmit your job with the adjusted parallelism.
> The rescale cli was experimental and was temporarily removed [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
>
> Thanks,
> Zhu Zhu
>
> KristoffSC <kr...@gmail.com> 于2020年1月9日周四 上午1:05写道:
>
>> Hi all,
>> I must say I'm very impressed by Flink and what it can do.
>>
>> I was trying to play around with Flink operator parallelism and
>> scalability
>> and I have few questions regarding this subject.
>>
>> My setup is:
>> 1. Flink 1.9.1
>> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
>> following [1]
>> 3. env setup:
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
>>         env.setParallelism(1);
>>         env.setMaxParallelism(128);
>>         env.enableCheckpointing(10 * 60 * 1000);
>>
>> Please mind that I am using operator chaining here.
>>
>> My pipeline setup:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>>
>>
>>
>> As you can see I have 7 operators (few of them were actually chained and
>> this is ok), with different parallelism level. This all gives me 23 tasks
>> total.
>>
>>
>> I've noticed that with "one task manager = one task slot" approach I have
>> to
>> have 6 task slots/task managers to be able to start this pipeline.
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>>
>>
>> If number of task slots is lower than 6, job is scheduled but not
>> started.
>>
>> With 6 task slots everything is working fine and I've must say that I'm
>> very
>> impressed with a way that Flinks balanced data between task slots. Data
>> was
>> distributed very evenly between operator instances/tasks.
>>
>> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
>> have
>> to be reused by more than one operator. While inspecting UI I've found
>> examples such operators. This is what I was expecting though.
>>
>> However I was surprised a little bit after I added one additional task
>> manager (hence one new task slot)
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>>
>>
>> After adding new resources, Flink did not re balanced/redistributed the
>> graph. So this host was sitting there and doing nothing. Even after
>> putting
>> some load on the cluster, still this node was not used.
>>
>>
>> *After doing this exercise I have few questions:*
>>
>> 1. It seems that number of task slots must be equal or greater than max
>> number of parallelism used in the pipeline. In my case it was 6. When I
>> changed parallelism for one of the operator to 7, I had to have 7 task
>> slots
>> (task managers in my setup) to be able to even start the job.
>> Is this the case?
>>
>> 2. What I can do to use the extra node that was spanned while job was
>> running?
>> In other words, If I would see that one of my nodes has to much load what
>> I
>> can do? Please mind that I'm using keyBy/hashing function in my pipeline
>> and
>> in my tests I had around 5000 unique keys.
>>
>> I've try to use REST API to call "rescale" but I got this response:
>> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>>
>> Thanks.
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Flink Job claster scalability

Posted by Zhu Zhu <re...@gmail.com>.
Hi KristoffSC,

Each task needs a slot to run. However, Flink enables slot sharing[1] by
default so that one slot can host one parallel instance of each task in a
job. That's why your job can start with 6 slots.
However, different parallel instances of the same task cannot share a slot.
That's why you need at least 6 slots to run your job.

You can set tasks to be in different slot sharing group via
'.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
allows the tasks to not burden each other. However, in this way the job
will need more slots to start.

So for your questions:
#1 yes
#2 ATM, you will need to resubmit your job with the adjusted parallelism.
The rescale cli was experimental and was temporarily removed [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html

Thanks,
Zhu Zhu

KristoffSC <kr...@gmail.com> 于2020年1月9日周四 上午1:05写道:

> Hi all,
> I must say I'm very impressed by Flink and what it can do.
>
> I was trying to play around with Flink operator parallelism and scalability
> and I have few questions regarding this subject.
>
> My setup is:
> 1. Flink 1.9.1
> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
> following [1]
> 3. env setup:
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
>         env.setParallelism(1);
>         env.setMaxParallelism(128);
>         env.enableCheckpointing(10 * 60 * 1000);
>
> Please mind that I am using operator chaining here.
>
> My pipeline setup:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>
>
>
> As you can see I have 7 operators (few of them were actually chained and
> this is ok), with different parallelism level. This all gives me 23 tasks
> total.
>
>
> I've noticed that with "one task manager = one task slot" approach I have
> to
> have 6 task slots/task managers to be able to start this pipeline.
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>
>
> If number of task slots is lower than 6, job is scheduled but not started.
>
> With 6 task slots everything is working fine and I've must say that I'm
> very
> impressed with a way that Flinks balanced data between task slots. Data was
> distributed very evenly between operator instances/tasks.
>
> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
> have
> to be reused by more than one operator. While inspecting UI I've found
> examples such operators. This is what I was expecting though.
>
> However I was surprised a little bit after I added one additional task
> manager (hence one new task slot)
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>
>
> After adding new resources, Flink did not re balanced/redistributed the
> graph. So this host was sitting there and doing nothing. Even after putting
> some load on the cluster, still this node was not used.
>
>
> *After doing this exercise I have few questions:*
>
> 1. It seems that number of task slots must be equal or greater than max
> number of parallelism used in the pipeline. In my case it was 6. When I
> changed parallelism for one of the operator to 7, I had to have 7 task
> slots
> (task managers in my setup) to be able to even start the job.
> Is this the case?
>
> 2. What I can do to use the extra node that was spanned while job was
> running?
> In other words, If I would see that one of my nodes has to much load what I
> can do? Please mind that I'm using keyBy/hashing function in my pipeline
> and
> in my tests I had around 5000 unique keys.
>
> I've try to use REST API to call "rescale" but I got this response:
> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>
> Thanks.
>
> [1]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>