You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhilong Hong (Jira)" <ji...@apache.org> on 2021/04/01 03:45:00 UTC

[jira] [Created] (FLINK-22077) Wrong way to calculate cross-region ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy

Zhilong Hong created FLINK-22077:
------------------------------------

             Summary: Wrong way to calculate cross-region ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy
                 Key: FLINK-22077
                 URL: https://issues.apache.org/jira/browse/FLINK-22077
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.13.0
            Reporter: Zhilong Hong
             Fix For: 1.13.0


h3. Introduction

We implement a wrong way to calculate cross-region ConsumedPartitionGroups in {{PipelinedRegionSchedulingStrategy}}, it slows down the procedure of {{onExecutionStateChange}}, make the complexity from O(N) to O(N^2). Also the semantic of cross-region is totally wrong.
h3. Details

In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need to schedule all region with no external blocking edges, i.e., source regions. To decrease the complexity, we choose to schedule all the regions that has no external BLOCKING ConsumedPartitionGroups.

However, for the case illustrated in FLINK-22017, the region 2 has a ConsumedPartitionGroup, which has both internal and external blocking IntermediateResultPartitions. If we choose one to represent the entire ConsumedPartitionGroup, it may choose the internal one, and make the entire group internal. Region 2 will be scheduled.

As Region 1 is not finished, Region 2 cannot transition to running. A deadlock may happen if resource is not enough for both two regions.

To make it right, we introduced cross-region ConsumedPartitionGroups in FLINK-21330. The regions which has ConsumedPartitionGroups with both internal and external blocking IntermediateResultPartitions will be recorded. When we call {{startScheduling}}, these ConsumedPartitionGroups will be treated as external, and region 2 will not be scheduled.

But we have to admit that the implementation of cross-region is wrong. The ConsumedPartitionGroups that has multiple producer regions will be treated as cross-region groups. It is not the same logic as we mentioned above. The semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING ConsumedPartitionGroups will be treated as cross-region groups, since their producers are in different regions. (Each producer has its own region.) This slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution

To correctly calculate the cross-region ConsumedPartitionGroups, we can just calculate the producer regions for all ConsumedPartitionGroups, and then iterate all the regions and its ConsumedPartitionGroups. If the ConsumedPartitionGroup has two or more producer regions, and the regions contains current region, it is a cross-region ConsumedPartitionGroup. This meets the correct semantics, and make sure ALL-TO-ALL BLOCKING ConsumedPartitionGroups will not be treated as cross-region one. This fix will also decreases the complexity from O(N) to O(N^2). I prefer it's necessary to add this bug-fix to release 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)