You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 14:54:12 UTC
[flink] 03/05: [FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f78fcb5ed05c9caa58e3e973ae7de1049671a0ed
Author: Weijie Guo <re...@163.com>
AuthorDate: Fri Sep 2 17:56:02 2022 +0800
[FLINK-29101] Filter the the execution vertex finished event for non-finished ConsumedPartitionGroup
---
.../scheduler/strategy/PipelinedRegionSchedulingStrategy.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index 1b4bf431648..df463636810 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -161,10 +161,15 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
}
}
- private Set<SchedulingPipelinedRegion> getDownstreamRegionsOfVertex(
+ private Set<SchedulingPipelinedRegion> getBlockingDownstreamRegionsOfVertex(
SchedulingExecutionVertex executionVertex) {
return IterableUtils.toStream(executionVertex.getProducedResults())
+ .filter(partition -> !partition.getResultType().canBePipelinedConsumed())
.flatMap(partition -> partition.getConsumedPartitionGroups().stream())
+ .filter(
+ group ->
+ crossRegionConsumedPartitionGroups.contains(group)
+ || group.areAllPartitionsFinished())
.flatMap(
partitionGroup ->
partitionGroupConsumerRegions
@@ -208,7 +213,8 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
final ExecutionVertexID executionVertexId, final ExecutionState executionState) {
if (executionState == ExecutionState.FINISHED) {
maybeScheduleRegions(
- getDownstreamRegionsOfVertex(schedulingTopology.getVertex(executionVertexId)));
+ getBlockingDownstreamRegionsOfVertex(
+ schedulingTopology.getVertex(executionVertexId)));
}
}