You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/14 14:54:14 UTC

[flink] 05/05: [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0165a8a7e3ccc6e82df7a30c67497a10e281153
Author: Weijie Guo <re...@163.com>
AuthorDate: Mon Sep 5 17:49:04 2022 +0800

    [FLINK-29101] ignore non canBePipelined consumedPartitionGroup in getAllSchedulableRegions.
    
    This closes #20739
---
 .../scheduler/strategy/PipelinedRegionSchedulingStrategy.java        | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
index a5b0482dbb4..3ccce83a5fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
@@ -248,6 +248,11 @@ public class PipelinedRegionSchedulingStrategy implements SchedulingStrategy {
                         .getOrDefault(currentRegion, Collections.emptySet())
                         .forEach(
                                 (producedPartitionGroup) -> {
+                                    if (!producedPartitionGroup
+                                            .getResultPartitionType()
+                                            .canBePipelinedConsumed()) {
+                                        return;
+                                    }
                                     // If this group has been visited, there is no need
                                     // to repeat the determination.
                                     if (visitedConsumedPartitionGroups.contains(