You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 14:54:12 UTC

[flink] 03/05: [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f78fcb5ed05c9caa58e3e973ae7de1049671a0ed
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 2 17:56:02 2022 +0800

    [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup
---
 .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 1b4bf431648..df463636810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -161,10 +161,15 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
         }
     }
 
-    private Set<SchedulingPipelinedRegion> getDownstreamRegionsOfVertex(
+    private Set<SchedulingPipelinedRegion> getBlockingDownstreamRegionsOfVertex(
             SchedulingExecutionVertex executionVertex) {
         return IterableUtils.toStream(executionVertex.getProducedResults())
+                .filter(partition -> !partition.getResultType().canBePipelinedConsumed())
                 .flatMap(partition -> partition.getConsumedPartitionGroups().stream())
+                .filter(
+                        group ->
+                                crossRegionConsumedPartitionGroups.contains(group)
+                                        || group.areAllPartitionsFinished())
                 .flatMap(
                         partitionGroup ->
                                 partitionGroupConsumerRegions
@@ -208,7 +213,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
             final ExecutionVertexID executionVertexId, final ExecutionState executionState) {
         if (executionState == ExecutionState.FINISHED) {
             maybeScheduleRegions(
-                    getDownstreamRegionsOfVertex(schedulingTopology.getVertex(executionVertexId)));
+                    getBlockingDownstreamRegionsOfVertex(
+                            schedulingTopology.getVertex(executionVertexId)));
         }
     }