You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/02/17 05:16:36 UTC
[flink] branch master updated: [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8f3a025 [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
8f3a025 is described below
commit 8f3a0251d195ed2532abc31e602029dfcbf7bc77
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Wed Feb 16 08:36:22 2022 +0800
[FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner
This closes #18789.
---
.../api/graph/StreamingJobGraphGenerator.java | 22 ++++++++++++++++++++--
.../ForwardForConsecutiveHashPartitionerTest.java | 13 +++++++++++++
.../partitioner/StreamPartitionerTestUtils.java | 22 +++++++++++++++++++---
3 files changed, 52 insertions(+), 5 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e056048..4801ec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1064,8 +1064,10 @@ public class StreamingJobGraphGenerator {
if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
- && (edge.getPartitioner() instanceof ForwardPartitioner)
- && edge.getExchangeMode() != StreamExchangeMode.BATCH
+ && arePartitionerAndExchangeModeChainable(
+ edge.getPartitioner(),
+ edge.getExchangeMode(),
+ streamGraph.getExecutionConfig().isDynamicGraph())
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled())) {
@@ -1084,6 +1086,22 @@ public class StreamingJobGraphGenerator {
}
@VisibleForTesting
+ static boolean arePartitionerAndExchangeModeChainable(
+ StreamPartitioner<?> partitioner,
+ StreamExchangeMode exchangeMode,
+ boolean isDynamicGraph) {
+ if (partitioner instanceof ForwardForConsecutiveHashPartitioner) {
+ checkState(isDynamicGraph);
+ return true;
+ } else if ((partitioner instanceof ForwardPartitioner)
+ && exchangeMode != StreamExchangeMode.BATCH) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @VisibleForTesting
static boolean areOperatorsChainable(
StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) {
StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
index 8e1863b..9e0fd6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -36,6 +37,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
@Test
public void testConvertToForwardPartitioner() {
+ testConvertToForwardPartitioner(StreamExchangeMode.BATCH);
+ testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED);
+ testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED);
+ }
+
+ private void testConvertToForwardPartitioner(StreamExchangeMode streamExchangeMode) {
JobGraph jobGraph =
StreamPartitionerTestUtils.createJobGraph(
"group1",
@@ -53,6 +60,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger {
@Test
public void testConvertToHashPartitioner() {
+ testConvertToHashPartitioner(StreamExchangeMode.BATCH);
+ testConvertToHashPartitioner(StreamExchangeMode.PIPELINED);
+ testConvertToHashPartitioner(StreamExchangeMode.UNDEFINED);
+ }
+
+ private void testConvertToHashPartitioner(StreamExchangeMode streamExchangeMode) {
JobGraph jobGraph =
StreamPartitionerTestUtils.createJobGraph(
"group1",
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
index 2059aaa..12e9d54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
/** Utility class to test {@link StreamPartitioner}. */
public class StreamPartitionerTestUtils {
@@ -31,6 +32,18 @@ public class StreamPartitionerTestUtils {
String sourceSlotSharingGroup,
String sinkSlotSharingGroup,
StreamPartitioner<Long> streamPartitioner) {
+ return createJobGraph(
+ sourceSlotSharingGroup,
+ sinkSlotSharingGroup,
+ streamPartitioner,
+ StreamExchangeMode.UNDEFINED);
+ }
+
+ public static JobGraph createJobGraph(
+ String sourceSlotSharingGroup,
+ String sinkSlotSharingGroup,
+ StreamPartitioner<Long> streamPartitioner,
+ StreamExchangeMode exchangeMode) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -39,7 +52,7 @@ public class StreamPartitionerTestUtils {
final DataStream<Long> source =
env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source");
- setPartitioner(source, streamPartitioner)
+ setPartitioner(source, streamPartitioner, exchangeMode)
.addSink(new DiscardingSink<>())
.slotSharingGroup(sinkSlotSharingGroup)
.name("sink");
@@ -48,10 +61,13 @@ public class StreamPartitionerTestUtils {
}
private static <T> DataStream<T> setPartitioner(
- DataStream<T> dataStream, StreamPartitioner<T> partitioner) {
+ DataStream<T> dataStream,
+ StreamPartitioner<T> partitioner,
+ StreamExchangeMode exchangeMode) {
return new DataStream<T>(
dataStream.getExecutionEnvironment(),
- new PartitionTransformation<T>(dataStream.getTransformation(), partitioner));
+ new PartitionTransformation<T>(
+ dataStream.getTransformation(), partitioner, exchangeMode));
}
/** Utility class, should not be instantiated. */