You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhu Zhu (Jira)" <ji...@apache.org> on 2019/12/05 09:04:00 UTC

[jira] [Comment Edited] (FLINK-15031) Calculate required shuffle memory cases before allocating slots in resources specified

    [ https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988603#comment-16988603 ] 

Zhu Zhu edited comment on FLINK-15031 at 12/5/19 9:03 AM:
----------------------------------------------------------

Thanks [~trohrmann] for the feedbacks. I can work on it.

Agreed that it would be better to let the {{ShuffleService}} determine the shuffle memory required for a vertex. So I'd like to introduce a interface 

{code:java}
MemorySize getShuffleMemoryForTask(int numberOfInputChannels, int numberOfResultSubpartitions);
{code}

in {{ShuffleMaster}} with a default returning value {{MemorySize.ZERO}}. The {{NettyShuffleMaster}} can then easily implement it by returning {{numInputChannel * buffersPerChannel (from config) + numberOfSubpartitions + 1}}. 

Another part of change would be in scheduler components. I'd like to introduce a {{ResourceRequirementsRetriever}} to generate the final ResourceProfiles regarding shuffle memory for vertices/groups and serve the querying.





was (Author: zhuzh):
Thanks [~trohrmann] for the feedbacks. I can work on it.

Agreed that it would be better to let the {{ShuffleService}} to determine the shuffle memory required for a vertex. So I'd like to introduce a interface 

{code:java}
MemorySize getShuffleMemoryForTask(int numberOfInputChannels, int numberOfResultSubpartitions);
{code}

in {{ShuffleMaster}} with a default returning value {{MemorySize.ZERO}}. The {{NettyShuffleMaster}} can then easily implement it by returning {{numInputChannel * buffersPerChannel (from config) + numberOfSubpartitions + 1}}. 

Another part of change would be in scheduler components. I'd like to introduce a {{ResourceRequirementsRetriever}} to generate the final ResourceProfiles regarding shuffle memory for vertices/groups and serve the querying.




> Calculate required shuffle memory cases before allocating slots in resources specified
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-15031
>                 URL: https://issues.apache.org/jira/browse/FLINK-15031
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> In resources specified cases, we expect each operator to declare required resources and before using them. In this way, no resource related error should happen if no resource is used more than declared. This ensures a deployed task would not fail due to insufficient resources in TM. This may result in unnecessary failures and may even cause a job hanging forever, failing repeatedly on deploying tasks to a TM with insufficient resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum network buffers are required by tasks to work. Currently a task is possible to be deployed to a TM with insufficient network buffers, and fails on launching.
> To avoid that, we should calculate required network memory for a task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required network buffers. The number of buffers required by a task (ExecutionVertex) is 
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * buffersPerChannel) + required buffers for result partition buffer pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle services, currently there is no way to get the required shuffle memory of a task.
> To make it simple under dynamic slot sharing, the required shuffle memory for a task should be the max required shuffle memory of all {{ExecutionVertex}} of the same {{ExecutionJobVertex}}. And the required shuffle memory for a slot sharing group should be the sum of shuffle memory for each {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)