You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhu Zhu (JIRA)" <ji...@apache.org> on 2018/11/20 09:59:00 UTC

[jira] [Created] (FLINK-10945) Avoid resource deadlocks for finite stream jobs when resources are limited

Zhu Zhu created FLINK-10945:
-------------------------------

             Summary: Avoid resource deadlocks for finite stream jobs when resources are limited
                 Key: FLINK-10945
                 URL: https://issues.apache.org/jira/browse/FLINK-10945
             Project: Flink
          Issue Type: Improvement
          Components: Distributed Coordination
    Affects Versions: 1.7.1
            Reporter: Zhu Zhu


Currently *resource deadlocks* can happen to finite stream jobs(or batch jobs) when resources are limited. In 2 cases as below:
 # Task Y is a pipelined downstream task of task X. When X takes all resources(slots), Y cannot acquire slots to start, thus the back pressure will block X to finish
 # Task Y is a upstream task of task X. When X takes all resources(slots) and Y cannot start, X cannot finish as some of its inputs are not finished

 

We can avoid case 1 by setting all edges to be BLOCKING to avoid pipeline back pressure. However, case 2 cannot be avoided as X(downstream task) will be launched when any of its input result is ready.

To be detailed, say task X has BLOCKING upstream task Y and Z, X can be launched when Z finishes, though task Y is not launched yet. This pre-launch behaviour can be beneficial when there are plenty of resources, thus X can process data from Z earlier before Y finishes its data processing. However, resource deadlocks may happen when the resources are limited, e.g. in small sessions.

 

I’d propose introducing a constraint named as *InputDependencyConstraint* to control the scheduling of vertices. It has 2 values:
 # *ANY*. The vertex can be scheduled when any of its inputs is consumable.
 # *ALL*. The vertex can be scheduled when all of its inputs are consumable.

Here one input corresponds to an *IntermediateResult*. The constraint can be configured for *certain vertex or job-wide*. Making the constraint default as *ANY* job-wide will keep the job scheduling behaves in the same way as current version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)