You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 14:54:14 UTC
[flink] 05/05: [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c0165a8a7e3ccc6e82df7a30c67497a10e281153
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 17:49:04 2022 +0800
[FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.
This closes #20739
---
.../scheduler/strategy/PipelinedRegionSchedulingStrategy.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index a5b0482dbb4..3ccce83a5fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -248,6 +248,11 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
.getOrDefault(currentRegion, Collections.emptySet())
.forEach(
(producedPartitionGroup) -> {
+ if (!producedPartitionGroup
+ .getResultPartitionType()
+ .canBePipelinedConsumed()) {
+ return;
+ }
// If this group has been visited, there is no need
// to repeat the determination.
if (visitedConsumedPartitionGroups.contains(