You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrey Zagrebin <az...@apache.org> on 2020/01/15 09:53:08 UTC

Re: [DISCUSS] decentralized scheduling strategy is needed

HI HuWeihua,

I think your issue should resolve with 1.9.2 and 1.10 (not released but in
progress).
You can check the related Jira ticket [1].

Best,
Andrey

[1] https://jira.apache.org/jira/browse/FLINK-12122

On Wed, Jan 15, 2020 at 10:08 AM HuWeihua <hu...@gmail.com> wrote:

> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink
> 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized
> scheduling, while Flink 1.5 prefers decentralized scheduling. This change
> has caused resources imbalance and blocked our upgrade plan. We have
> thousands of jobs that need to be upgraded.
>
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and
> each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
> yarn.containers.vcores is 2.
>
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
> totally. So the job with this configuration works very well. The schedule
> results is shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks
> will scheduled to other 10 TaskManagers.  The schedule results is shown
> in Figure 2.
> In this scenario, the TaskManager which run sources need 10 cores, other
> TaskManagers need 1 cores. But TaskManager must be configured the same, So
> we need 11 TaskManager with 10 cores.
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
>
> In addition to the waste of resources, we also encountered other problems
> caused by centralized scheduling strategy.
>
>    1. Network bandwidth. Tasks of the same type are scheduled to the one
>    TaskManager, causing too much network traffic on the machine.
>
>
>    1. Some jobs need to sink to the local agent. After centralized
>    scheduling, the insufficient processing capacity of the single machine
>    causes a backlog of consumption.
>
>
> In summary, we think a decentralized scheduling strategy is necessary.
>
>
> Figure 1. Flink 1.5 schedule results
>
> Figure 2. Flink 1.9 schedule results
>
>
>
> Best
> Weihua Hu
>
>

Re: [DISCUSS] decentralized scheduling strategy is needed

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting the issue HuWeihua. Choosing the right scheduling
strategy when using Yarn with potentially infinite resources can be quite
hard because you don't know over how many TaskExecutors one should
distribute the tasks. It becomes easier if one can configure the minimum
number of TaskExecutors a cluster should always have. This is currently
being discussed and I hope that we can complete this feature for the next
release.

Cheers,
Till

On Wed, Jan 15, 2020 at 11:29 AM HuWeihua <hu...@gmail.com> wrote:

> Hi, Andrey
>
> Thanks for your response.
>
> I have checked this Jira ticket and I think it can work in standalone mode
> which TaskManager has been started before scheduling tasks.
> But we are currently running flink on yarn in per-job cluster mode.
>
> I noticed that this issue has already been raised. I will keep watching
> this ticket.
>
> Thanks again.
>
> Best
> Weihua Hu
>
> 2020年1月15日 17:53,Andrey Zagrebin <az...@apache.org> 写道:
>
> HI HuWeihua,
>
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in
> progress).
> You can check the related Jira ticket [1].
>
> Best,
> Andrey
>
> [1] https://jira.apache.org/jira/browse/FLINK-12122
>
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua <hu...@gmail.com> wrote:
>
>> Hi, All
>> We encountered some problems during the upgrade from Flink 1.5 to Flink
>> 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized
>> scheduling, while Flink 1.5 prefers decentralized scheduling. This change
>> has caused resources imbalance and blocked our upgrade plan. We have
>> thousands of jobs that need to be upgraded.
>>
>> For example,
>> There is a job with 10 sources and 100 sinks. Each source need 1 core and
>> each sink need 0.1 core.
>> Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
>> yarn.containers.vcores is 2.
>>
>> When using Flink-1.5:
>> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
>> totally. So the job with this configuration works very well. The schedule
>> results is shown in Figure 1.
>> When using Flink-1.9:
>> The 10 sources will be scheduled to one TaskManager  and the 100 sinks
>> will scheduled to other 10 TaskManagers.  The schedule results is shown
>> in Figure 2.
>> In this scenario, the TaskManager which run sources need 10 cores, other
>> TaskManagers need 1 cores. But TaskManager must be configured the same, So
>> we need 11 TaskManager with 10 cores.
>> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
>>
>> In addition to the waste of resources, we also encountered other problems
>> caused by centralized scheduling strategy.
>>
>>    1. Network bandwidth. Tasks of the same type are scheduled to the one
>>    TaskManager, causing too much network traffic on the machine.
>>
>>
>>    1. Some jobs need to sink to the local agent. After centralized
>>    scheduling, the insufficient processing capacity of the single machine
>>    causes a backlog of consumption.
>>
>>
>> In summary, we think a decentralized scheduling strategy is necessary.
>>
>>
>> Figure 1. Flink 1.5 schedule results
>> <粘贴的图形-3.tiff>
>>
>> Figure 2. Flink 1.9 schedule results
>> <粘贴的图形-4.tiff>
>>
>>
>>
>> Best
>> Weihua Hu
>>
>>
>

Re: [DISCUSS] decentralized scheduling strategy is needed

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting the issue HuWeihua. Choosing the right scheduling
strategy when using Yarn with potentially infinite resources can be quite
hard because you don't know over how many TaskExecutors one should
distribute the tasks. It becomes easier if one can configure the minimum
number of TaskExecutors a cluster should always have. This is currently
being discussed and I hope that we can complete this feature for the next
release.

Cheers,
Till

On Wed, Jan 15, 2020 at 11:29 AM HuWeihua <hu...@gmail.com> wrote:

> Hi, Andrey
>
> Thanks for your response.
>
> I have checked this Jira ticket and I think it can work in standalone mode
> which TaskManager has been started before scheduling tasks.
> But we are currently running flink on yarn in per-job cluster mode.
>
> I noticed that this issue has already been raised. I will keep watching
> this ticket.
>
> Thanks again.
>
> Best
> Weihua Hu
>
> 2020年1月15日 17:53,Andrey Zagrebin <az...@apache.org> 写道:
>
> HI HuWeihua,
>
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in
> progress).
> You can check the related Jira ticket [1].
>
> Best,
> Andrey
>
> [1] https://jira.apache.org/jira/browse/FLINK-12122
>
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua <hu...@gmail.com> wrote:
>
>> Hi, All
>> We encountered some problems during the upgrade from Flink 1.5 to Flink
>> 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized
>> scheduling, while Flink 1.5 prefers decentralized scheduling. This change
>> has caused resources imbalance and blocked our upgrade plan. We have
>> thousands of jobs that need to be upgraded.
>>
>> For example,
>> There is a job with 10 sources and 100 sinks. Each source need 1 core and
>> each sink need 0.1 core.
>> Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
>> yarn.containers.vcores is 2.
>>
>> When using Flink-1.5:
>> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
>> totally. So the job with this configuration works very well. The schedule
>> results is shown in Figure 1.
>> When using Flink-1.9:
>> The 10 sources will be scheduled to one TaskManager  and the 100 sinks
>> will scheduled to other 10 TaskManagers.  The schedule results is shown
>> in Figure 2.
>> In this scenario, the TaskManager which run sources need 10 cores, other
>> TaskManagers need 1 cores. But TaskManager must be configured the same, So
>> we need 11 TaskManager with 10 cores.
>> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
>>
>> In addition to the waste of resources, we also encountered other problems
>> caused by centralized scheduling strategy.
>>
>>    1. Network bandwidth. Tasks of the same type are scheduled to the one
>>    TaskManager, causing too much network traffic on the machine.
>>
>>
>>    1. Some jobs need to sink to the local agent. After centralized
>>    scheduling, the insufficient processing capacity of the single machine
>>    causes a backlog of consumption.
>>
>>
>> In summary, we think a decentralized scheduling strategy is necessary.
>>
>>
>> Figure 1. Flink 1.5 schedule results
>> <粘贴的图形-3.tiff>
>>
>> Figure 2. Flink 1.9 schedule results
>> <粘贴的图形-4.tiff>
>>
>>
>>
>> Best
>> Weihua Hu
>>
>>
>

Re: [DISCUSS] decentralized scheduling strategy is needed

Posted by HuWeihua <hu...@gmail.com>.
Hi, Andrey

Thanks for your response.

I have checked this Jira ticket and I think it can work in standalone mode which TaskManager has been started before scheduling tasks.
But we are currently running flink on yarn in per-job cluster mode.

I noticed that this issue has already been raised. I will keep watching this ticket. 

Thanks again.

Best
Weihua Hu

> 2020年1月15日 17:53,Andrey Zagrebin <az...@apache.org> 写道:
> 
> HI HuWeihua,
> 
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in progress).
> You can check the related Jira ticket [1].
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-12122 <https://jira.apache.org/jira/browse/FLINK-12122>
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua <huweihua.ckl@gmail.com <ma...@gmail.com>> wrote:
> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized scheduling, while Flink 1.5 prefers decentralized scheduling. This change has caused resources imbalance and blocked our upgrade plan. We have thousands of jobs that need to be upgraded.
> 
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10, yarn.containers.vcores is 2.
> 
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores totally. So the job with this configuration works very well. The schedule results is shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks will scheduled to other 10 TaskManagers.  The schedule results is shown in Figure 2.
> In this scenario, the TaskManager which run sources need 10 cores, other TaskManagers need 1 cores. But TaskManager must be configured the same, So we need 11 TaskManager with 10 cores. 
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
> 
> In addition to the waste of resources, we also encountered other problems caused by centralized scheduling strategy.
> Network bandwidth. Tasks of the same type are scheduled to the one TaskManager, causing too much network traffic on the machine.
> Some jobs need to sink to the local agent. After centralized scheduling, the insufficient processing capacity of the single machine causes a backlog of consumption.
> 
> In summary, we think a decentralized scheduling strategy is necessary.
> 
> 
> Figure 1. Flink 1.5 schedule results
> <粘贴的图形-3.tiff>
> 
> Figure 2. Flink 1.9 schedule results
> <粘贴的图形-4.tiff>
> 
> 
> 
> Best
> Weihua Hu
> 


Re: [DISCUSS] decentralized scheduling strategy is needed

Posted by HuWeihua <hu...@gmail.com>.
Hi, Andrey

Thanks for your response.

I have checked this Jira ticket and I think it can work in standalone mode which TaskManager has been started before scheduling tasks.
But we are currently running flink on yarn in per-job cluster mode.

I noticed that this issue has already been raised. I will keep watching this ticket. 

Thanks again.

Best
Weihua Hu

> 2020年1月15日 17:53,Andrey Zagrebin <az...@apache.org> 写道:
> 
> HI HuWeihua,
> 
> I think your issue should resolve with 1.9.2 and 1.10 (not released but in progress).
> You can check the related Jira ticket [1].
> 
> Best,
> Andrey
> 
> [1] https://jira.apache.org/jira/browse/FLINK-12122 <https://jira.apache.org/jira/browse/FLINK-12122>
> On Wed, Jan 15, 2020 at 10:08 AM HuWeihua <huweihua.ckl@gmail.com <ma...@gmail.com>> wrote:
> Hi, All
> We encountered some problems during the upgrade from Flink 1.5 to Flink 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers centralized scheduling, while Flink 1.5 prefers decentralized scheduling. This change has caused resources imbalance and blocked our upgrade plan. We have thousands of jobs that need to be upgraded.
> 
> For example,
> There is a job with 10 sources and 100 sinks. Each source need 1 core and each sink need 0.1 core.
> Try to run this job on Yarn, configure the numberOfTaskSlots is 10, yarn.containers.vcores is 2.
> 
> When using Flink-1.5:
> Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores totally. So the job with this configuration works very well. The schedule results is shown in Figure 1.
> When using Flink-1.9:
> The 10 sources will be scheduled to one TaskManager  and the 100 sinks will scheduled to other 10 TaskManagers.  The schedule results is shown in Figure 2.
> In this scenario, the TaskManager which run sources need 10 cores, other TaskManagers need 1 cores. But TaskManager must be configured the same, So we need 11 TaskManager with 10 cores. 
> This situation waste (10-2)*11 = 88 cores more than Flink 1.5.
> 
> In addition to the waste of resources, we also encountered other problems caused by centralized scheduling strategy.
> Network bandwidth. Tasks of the same type are scheduled to the one TaskManager, causing too much network traffic on the machine.
> Some jobs need to sink to the local agent. After centralized scheduling, the insufficient processing capacity of the single machine causes a backlog of consumption.
> 
> In summary, we think a decentralized scheduling strategy is necessary.
> 
> 
> Figure 1. Flink 1.5 schedule results
> <粘贴的图形-3.tiff>
> 
> Figure 2. Flink 1.9 schedule results
> <粘贴的图形-4.tiff>
> 
> 
> 
> Best
> Weihua Hu
>