You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhu Zhu (Jira)" <ji...@apache.org> on 2020/05/18 11:30:00 UTC

[jira] [Updated] (FLINK-15031) Calculate required shuffle memory before allocating slots if resources are specified

     [ https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhu Zhu updated FLINK-15031:
----------------------------
    Fix Version/s:     (was: 1.11.0)
                   1.12.0

> Calculate required shuffle memory before allocating slots if resources are specified
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-15031
>                 URL: https://issues.apache.org/jira/browse/FLINK-15031
>             Project: Flink
>          Issue Type: Task
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Zhu Zhu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare required resources before using them. In this way, no resource related error should happen if resources are not used beyond what was declared. This ensures a deployed task would not fail due to insufficient resources in TM, which may result in unnecessary failures and may even cause a job hanging forever, failing repeatedly on deploying tasks to a TM with insufficient resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum network buffers are required by tasks to work. Currently a task is possible to be deployed to a TM with insufficient network buffers, and fails on launching.
> To avoid that, we should calculate required network memory for a task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * buffersPerChannel) + required buffers for result partition buffer pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle services, currently there is no way to get the required shuffle memory of a task.
> To make it simple under dynamic slot sharing, the required shuffle memory for a task should be the max required shuffle memory of all {{ExecutionVertex}} of the same {{ExecutionJobVertex}}. And the required shuffle memory for a slot sharing group should be the sum of shuffle memory for each {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)