You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Maximilian Michels <mx...@apache.org> on 2018/10/11 18:16:51 UTC

Spreading Tasks across TaskManagers

Hi everyone,

I've recently come across a cluster scheduling problem users are facing. 
Clusters where TaskManagers have more slots than the parallelism 
(#tm_slots > job_parallelism), tend to schedule all job tasks on a 
single TaskManager.

This is not good for spreading load and has been discussed in FLINK-1003 
[1] and the other duplicate JIRA issues.

I know that this is not really an issue if the cluster is created 
exclusively for the Job, or if the number of slots per Taskmanager is 
smaller than the parallelism. However, this seems like a rather easy 
improvement to the Scheduler which would have a huge impact on performance.

On the JIRA issue page it has been mentioned that this was put on hold 
to work on dynamic scaling first.

Now that the basic building blocks for dynamic scaling are in place, do 
you think it would be possible to tackle FLINK-1003?

Thanks,
Max


[1] https://issues.apache.org/jira/browse/FLINK-1003

Re: Spreading Tasks across TaskManagers

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

> In general, I'm not 100% sure whether spreading out tasks is always the
> best strategy. Especially if you have a network heavy job co-locating tasks
> on the same TM could have benefits over spreading the tasks out.

Definitely spreading out tasks is not always the best, but I would guess that it’s better more often then not. With some simplification (like rack locality for example) if you add more machines your max total network throughput also goes up.

Also increased network traffic is only visible with very low number of nodes. Going from 2 to 4 nodes increases the fraction of remote writes from 2/4 up to 3/4 (so network usage goes up by 50%), while going from 10 to 20 nodes increases the fraction from 18/20 up to only 19/20 (negligible 5.5% increase). 

Trying to limit the number of machines running the job may be important with hundreds/thousands of machines and multiple running jobs, where failures are becoming quite common and when one wants to limit the impact of a single node failure to just one job.

Piotrek

> On 16 Oct 2018, at 14:55, Till Rohrmann <tr...@apache.org> wrote:
> 
> Yes, the ResourceSpec is not yet fully functional. The idea is to allow the
> user to specify how many resources an operator needs. Depending on these
> requirements, the RM should allocate slots which can fulfill these
> requirements.
> 
> Cheers,
> Till
> 
> On Tue, Oct 16, 2018 at 2:29 PM Maximilian Michels <mx...@apache.org> wrote:
> 
>>> the community is currently working on Flink's scheduler component [1]
>> That sounds great! I agree that spreading tasks across the nodes is not
>> always desirable but it would be nice to give users an option to provide
>> hints to the scheduler. The location aware bulk scheduling you mentioned
>> would be useful.
>> 
>> Today, there is already the option to assign Resources to a
>> StreamTransformation. From a quick test, it seems like those resource
>> specifications are not honored yet.
>> 
>> -Max
>> 
>> On 13.10.18 01:41, Thomas Weise wrote:
>>> Hi Till,
>>> 
>>> Thanks for the pointer, glad that this is being worked on.
>>> 
>>> It almost looks like the non deterministic distribution behavior started
>>> with 1.5.x (?) and that surprised us.
>>> 
>>> https://issues.apache.org/jira/browse/BEAM-5713
>>> 
>>> I agree that there is no one strategy that fits every use case. If an
>>> application is limited by a resource per machine that the scheduler does
>>> not understand (like let's say CPU or disk I/O), then it would be nice to
>>> have a way to hint that round-robin distribution is desired (or achieve
>> the
>>> same through anti-affinity or resource constraints).
>>> 
>>> Thanks,
>>> Thomas
>>> 
>>> 
>>> 
>>> On Fri, Oct 12, 2018 at 2:06 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>>> 
>>>> Hi Max,
>>>> 
>>>> the community is currently working on Flink's scheduler component [1].
>> One
>>>> of the things we want to enable in the future is bulk scheduling. With
>>>> this, it should also be possible to add strategies how to distribute
>> tasks
>>>> across multiple TMs (spreading vs. co-locating).
>>>> 
>>>> In general, I'm not 100% sure whether spreading out tasks is always the
>>>> best strategy. Especially if you have a network heavy job co-locating
>> tasks
>>>> on the same TM could have benefits over spreading the tasks out.
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-10429
>>>> 
>>>> Cheers,
>>>> Till
>>>> 
>>>> On Thu, Oct 11, 2018 at 8:16 PM Maximilian Michels <mx...@apache.org>
>> wrote:
>>>> 
>>>>> Hi everyone,
>>>>> 
>>>>> I've recently come across a cluster scheduling problem users are
>> facing.
>>>>> Clusters where TaskManagers have more slots than the parallelism
>>>>> (#tm_slots > job_parallelism), tend to schedule all job tasks on a
>>>>> single TaskManager.
>>>>> 
>>>>> This is not good for spreading load and has been discussed in
>> FLINK-1003
>>>>> [1] and the other duplicate JIRA issues.
>>>>> 
>>>>> I know that this is not really an issue if the cluster is created
>>>>> exclusively for the Job, or if the number of slots per Taskmanager is
>>>>> smaller than the parallelism. However, this seems like a rather easy
>>>>> improvement to the Scheduler which would have a huge impact on
>>>> performance.
>>>>> 
>>>>> On the JIRA issue page it has been mentioned that this was put on hold
>>>>> to work on dynamic scaling first.
>>>>> 
>>>>> Now that the basic building blocks for dynamic scaling are in place, do
>>>>> you think it would be possible to tackle FLINK-1003?
>>>>> 
>>>>> Thanks,
>>>>> Max
>>>>> 
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-1003
>>>>> 
>>>> 
>>> 
>> 


Re: Spreading Tasks across TaskManagers

Posted by Till Rohrmann <tr...@apache.org>.
Yes, the ResourceSpec is not yet fully functional. The idea is to allow the
user to specify how many resources an operator needs. Depending on these
requirements, the RM should allocate slots which can fulfill these
requirements.

Cheers,
Till

On Tue, Oct 16, 2018 at 2:29 PM Maximilian Michels <mx...@apache.org> wrote:

> > the community is currently working on Flink's scheduler component [1]
> That sounds great! I agree that spreading tasks across the nodes is not
> always desirable but it would be nice to give users an option to provide
> hints to the scheduler. The location aware bulk scheduling you mentioned
> would be useful.
>
> Today, there is already the option to assign Resources to a
> StreamTransformation. From a quick test, it seems like those resource
> specifications are not honored yet.
>
> -Max
>
> On 13.10.18 01:41, Thomas Weise wrote:
> > Hi Till,
> >
> > Thanks for the pointer, glad that this is being worked on.
> >
> > It almost looks like the non deterministic distribution behavior started
> > with 1.5.x (?) and that surprised us.
> >
> > https://issues.apache.org/jira/browse/BEAM-5713
> >
> > I agree that there is no one strategy that fits every use case. If an
> > application is limited by a resource per machine that the scheduler does
> > not understand (like let's say CPU or disk I/O), then it would be nice to
> > have a way to hint that round-robin distribution is desired (or achieve
> the
> > same through anti-affinity or resource constraints).
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Fri, Oct 12, 2018 at 2:06 AM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> >> Hi Max,
> >>
> >> the community is currently working on Flink's scheduler component [1].
> One
> >> of the things we want to enable in the future is bulk scheduling. With
> >> this, it should also be possible to add strategies how to distribute
> tasks
> >> across multiple TMs (spreading vs. co-locating).
> >>
> >> In general, I'm not 100% sure whether spreading out tasks is always the
> >> best strategy. Especially if you have a network heavy job co-locating
> tasks
> >> on the same TM could have benefits over spreading the tasks out.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-10429
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Oct 11, 2018 at 8:16 PM Maximilian Michels <mx...@apache.org>
> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> I've recently come across a cluster scheduling problem users are
> facing.
> >>> Clusters where TaskManagers have more slots than the parallelism
> >>> (#tm_slots > job_parallelism), tend to schedule all job tasks on a
> >>> single TaskManager.
> >>>
> >>> This is not good for spreading load and has been discussed in
> FLINK-1003
> >>> [1] and the other duplicate JIRA issues.
> >>>
> >>> I know that this is not really an issue if the cluster is created
> >>> exclusively for the Job, or if the number of slots per Taskmanager is
> >>> smaller than the parallelism. However, this seems like a rather easy
> >>> improvement to the Scheduler which would have a huge impact on
> >> performance.
> >>>
> >>> On the JIRA issue page it has been mentioned that this was put on hold
> >>> to work on dynamic scaling first.
> >>>
> >>> Now that the basic building blocks for dynamic scaling are in place, do
> >>> you think it would be possible to tackle FLINK-1003?
> >>>
> >>> Thanks,
> >>> Max
> >>>
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-1003
> >>>
> >>
> >
>

Re: Spreading Tasks across TaskManagers

Posted by Maximilian Michels <mx...@apache.org>.
> the community is currently working on Flink's scheduler component [1]
That sounds great! I agree that spreading tasks across the nodes is not 
always desirable but it would be nice to give users an option to provide 
hints to the scheduler. The location aware bulk scheduling you mentioned 
would be useful.

Today, there is already the option to assign Resources to a 
StreamTransformation. From a quick test, it seems like those resource 
specifications are not honored yet.

-Max

On 13.10.18 01:41, Thomas Weise wrote:
> Hi Till,
> 
> Thanks for the pointer, glad that this is being worked on.
> 
> It almost looks like the non deterministic distribution behavior started
> with 1.5.x (?) and that surprised us.
> 
> https://issues.apache.org/jira/browse/BEAM-5713
> 
> I agree that there is no one strategy that fits every use case. If an
> application is limited by a resource per machine that the scheduler does
> not understand (like let's say CPU or disk I/O), then it would be nice to
> have a way to hint that round-robin distribution is desired (or achieve the
> same through anti-affinity or resource constraints).
> 
> Thanks,
> Thomas
> 
> 
> 
> On Fri, Oct 12, 2018 at 2:06 AM Till Rohrmann <tr...@apache.org> wrote:
> 
>> Hi Max,
>>
>> the community is currently working on Flink's scheduler component [1]. One
>> of the things we want to enable in the future is bulk scheduling. With
>> this, it should also be possible to add strategies how to distribute tasks
>> across multiple TMs (spreading vs. co-locating).
>>
>> In general, I'm not 100% sure whether spreading out tasks is always the
>> best strategy. Especially if you have a network heavy job co-locating tasks
>> on the same TM could have benefits over spreading the tasks out.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10429
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 11, 2018 at 8:16 PM Maximilian Michels <mx...@apache.org> wrote:
>>
>>> Hi everyone,
>>>
>>> I've recently come across a cluster scheduling problem users are facing.
>>> Clusters where TaskManagers have more slots than the parallelism
>>> (#tm_slots > job_parallelism), tend to schedule all job tasks on a
>>> single TaskManager.
>>>
>>> This is not good for spreading load and has been discussed in FLINK-1003
>>> [1] and the other duplicate JIRA issues.
>>>
>>> I know that this is not really an issue if the cluster is created
>>> exclusively for the Job, or if the number of slots per Taskmanager is
>>> smaller than the parallelism. However, this seems like a rather easy
>>> improvement to the Scheduler which would have a huge impact on
>> performance.
>>>
>>> On the JIRA issue page it has been mentioned that this was put on hold
>>> to work on dynamic scaling first.
>>>
>>> Now that the basic building blocks for dynamic scaling are in place, do
>>> you think it would be possible to tackle FLINK-1003?
>>>
>>> Thanks,
>>> Max
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-1003
>>>
>>
> 

Re: Spreading Tasks across TaskManagers

Posted by Thomas Weise <th...@apache.org>.
Hi Till,

Thanks for the pointer, glad that this is being worked on.

It almost looks like the non deterministic distribution behavior started
with 1.5.x (?) and that surprised us.

https://issues.apache.org/jira/browse/BEAM-5713

I agree that there is no one strategy that fits every use case. If an
application is limited by a resource per machine that the scheduler does
not understand (like let's say CPU or disk I/O), then it would be nice to
have a way to hint that round-robin distribution is desired (or achieve the
same through anti-affinity or resource constraints).

Thanks,
Thomas



On Fri, Oct 12, 2018 at 2:06 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Max,
>
> the community is currently working on Flink's scheduler component [1]. One
> of the things we want to enable in the future is bulk scheduling. With
> this, it should also be possible to add strategies how to distribute tasks
> across multiple TMs (spreading vs. co-locating).
>
> In general, I'm not 100% sure whether spreading out tasks is always the
> best strategy. Especially if you have a network heavy job co-locating tasks
> on the same TM could have benefits over spreading the tasks out.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10429
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:16 PM Maximilian Michels <mx...@apache.org> wrote:
>
> > Hi everyone,
> >
> > I've recently come across a cluster scheduling problem users are facing.
> > Clusters where TaskManagers have more slots than the parallelism
> > (#tm_slots > job_parallelism), tend to schedule all job tasks on a
> > single TaskManager.
> >
> > This is not good for spreading load and has been discussed in FLINK-1003
> > [1] and the other duplicate JIRA issues.
> >
> > I know that this is not really an issue if the cluster is created
> > exclusively for the Job, or if the number of slots per Taskmanager is
> > smaller than the parallelism. However, this seems like a rather easy
> > improvement to the Scheduler which would have a huge impact on
> performance.
> >
> > On the JIRA issue page it has been mentioned that this was put on hold
> > to work on dynamic scaling first.
> >
> > Now that the basic building blocks for dynamic scaling are in place, do
> > you think it would be possible to tackle FLINK-1003?
> >
> > Thanks,
> > Max
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-1003
> >
>

Re: Spreading Tasks across TaskManagers

Posted by Till Rohrmann <tr...@apache.org>.
Hi Max,

the community is currently working on Flink's scheduler component [1]. One
of the things we want to enable in the future is bulk scheduling. With
this, it should also be possible to add strategies how to distribute tasks
across multiple TMs (spreading vs. co-locating).

In general, I'm not 100% sure whether spreading out tasks is always the
best strategy. Especially if you have a network heavy job co-locating tasks
on the same TM could have benefits over spreading the tasks out.

[1] https://issues.apache.org/jira/browse/FLINK-10429

Cheers,
Till

On Thu, Oct 11, 2018 at 8:16 PM Maximilian Michels <mx...@apache.org> wrote:

> Hi everyone,
>
> I've recently come across a cluster scheduling problem users are facing.
> Clusters where TaskManagers have more slots than the parallelism
> (#tm_slots > job_parallelism), tend to schedule all job tasks on a
> single TaskManager.
>
> This is not good for spreading load and has been discussed in FLINK-1003
> [1] and the other duplicate JIRA issues.
>
> I know that this is not really an issue if the cluster is created
> exclusively for the Job, or if the number of slots per Taskmanager is
> smaller than the parallelism. However, this seems like a rather easy
> improvement to the Scheduler which would have a huge impact on performance.
>
> On the JIRA issue page it has been mentioned that this was put on hold
> to work on dynamic scaling first.
>
> Now that the basic building blocks for dynamic scaling are in place, do
> you think it would be possible to tackle FLINK-1003?
>
> Thanks,
> Max
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-1003
>