You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/02/17 05:16:36 UTC

[flink] branch master updated: [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f3a025  [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
8f3a025 is described below

commit 8f3a0251d195ed2532abc31e602029dfcbf7bc77
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Wed Feb 16 08:36:22 2022 +0800

    [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
    
    This closes #18789.
---
 .../api/graph/StreamingJobGraphGenerator.java      | 22 ++++++++++++++++++++--
 .../ForwardForConsecutiveHashPartitionerTest.java  | 13 +++++++++++++
 .../partitioner/StreamPartitionerTestUtils.java    | 22 +++++++++++++++++++---
 3 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e056048..4801ec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1064,8 +1064,10 @@ public class StreamingJobGraphGenerator {
 
         if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                 && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
-                && (edge.getPartitioner() instanceof ForwardPartitioner)
-                && edge.getExchangeMode() != StreamExchangeMode.BATCH
+                && arePartitionerAndExchangeModeChainable(
+                        edge.getPartitioner(),
+                        edge.getExchangeMode(),
+                        streamGraph.getExecutionConfig().isDynamicGraph())
                 && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                 && streamGraph.isChainingEnabled())) {
 
@@ -1084,6 +1086,22 @@ public class StreamingJobGraphGenerator {
     }
 
     @VisibleForTesting
+    static boolean arePartitionerAndExchangeModeChainable(
+            StreamPartitioner<?> partitioner,
+            StreamExchangeMode exchangeMode,
+            boolean isDynamicGraph) {
+        if (partitioner instanceof ForwardForConsecutiveHashPartitioner) {
+            checkState(isDynamicGraph);
+            return true;
+        } else if ((partitioner instanceof ForwardPartitioner)
+                && exchangeMode != StreamExchangeMode.BATCH) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @VisibleForTesting
     static boolean areOperatorsChainable(
             StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) {
         StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
index 8e1863b..9e0fd6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -36,6 +37,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
 
     @Test
     public void testConvertToForwardPartitioner() {
+        testConvertToForwardPartitioner(StreamExchangeMode.BATCH);
+        testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED);
+        testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED);
+    }
+
+    private void testConvertToForwardPartitioner(StreamExchangeMode streamExchangeMode) {
         JobGraph jobGraph =
                 StreamPartitionerTestUtils.createJobGraph(
                         "group1",
@@ -53,6 +60,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
 
     @Test
     public void testConvertToHashPartitioner() {
+        testConvertToHashPartitioner(StreamExchangeMode.BATCH);
+        testConvertToHashPartitioner(StreamExchangeMode.PIPELINED);
+        testConvertToHashPartitioner(StreamExchangeMode.UNDEFINED);
+    }
+
+    private void testConvertToHashPartitioner(StreamExchangeMode streamExchangeMode) {
         JobGraph jobGraph =
                 StreamPartitionerTestUtils.createJobGraph(
                         "group1",
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
index 2059aaa..12e9d54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 
 /** Utility class to test {@link StreamPartitioner}. */
 public class StreamPartitionerTestUtils {
@@ -31,6 +32,18 @@ public class StreamPartitionerTestUtils {
             String sourceSlotSharingGroup,
             String sinkSlotSharingGroup,
             StreamPartitioner<Long> streamPartitioner) {
+        return createJobGraph(
+                sourceSlotSharingGroup,
+                sinkSlotSharingGroup,
+                streamPartitioner,
+                StreamExchangeMode.UNDEFINED);
+    }
+
+    public static JobGraph createJobGraph(
+            String sourceSlotSharingGroup,
+            String sinkSlotSharingGroup,
+            StreamPartitioner<Long> streamPartitioner,
+            StreamExchangeMode exchangeMode) {
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -39,7 +52,7 @@ public class StreamPartitionerTestUtils {
         final DataStream<Long> source =
                 env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source");
 
-        setPartitioner(source, streamPartitioner)
+        setPartitioner(source, streamPartitioner, exchangeMode)
                 .addSink(new DiscardingSink<>())
                 .slotSharingGroup(sinkSlotSharingGroup)
                 .name("sink");
@@ -48,10 +61,13 @@ public class StreamPartitionerTestUtils {
     }
 
     private static <T> DataStream<T> setPartitioner(
-            DataStream<T> dataStream, StreamPartitioner<T> partitioner) {
+            DataStream<T> dataStream,
+            StreamPartitioner<T> partitioner,
+            StreamExchangeMode exchangeMode) {
         return new DataStream<T>(
                 dataStream.getExecutionEnvironment(),
-                new PartitionTransformation<T>(dataStream.getTransformation(), partitioner));
+                new PartitionTransformation<T>(
+                        dataStream.getTransformation(), partitioner, exchangeMode));
     }
 
     /** Utility class, should not be instantiated. */