You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/09 08:09:58 UTC
[flink] 02/02: [FLINK-13101][datastream] Introduce
blockingConnectionsBetweenChains property of StreamGraph
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e27744c94c59f2bcc661b3f94f311221c021773
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Thu Jul 4 20:18:04 2019 +0800
[FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph
---
.../flink/streaming/api/graph/StreamGraph.java | 22 +++++++
.../streaming/api/graph/StreamGraphGenerator.java | 12 ++++
.../api/graph/StreamingJobGraphGenerator.java | 5 +-
.../api/graph/StreamingJobGraphGeneratorTest.java | 73 ++++++++++++++++++++++
4 files changed, 111 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ffacc94..16f04aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -97,6 +97,12 @@ public class StreamGraph extends StreamingPlan {
private TimeCharacteristic timeCharacteristic;
+ /**
+ * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+ * specified, translate these edges into {@code BLOCKING} result partition type.
+ */
+ private boolean blockingConnectionsBetweenChains;
+
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
@@ -184,6 +190,22 @@ public class StreamGraph extends StreamingPlan {
this.timeCharacteristic = timeCharacteristic;
}
+ /**
+ * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+ * specified, translate these edges into {@code BLOCKING} result partition type.
+ */
+ public boolean isBlockingConnectionsBetweenChains() {
+ return blockingConnectionsBetweenChains;
+ }
+
+ /**
+ * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+ * specified, translate these edges into {@code BLOCKING} result partition type.
+ */
+ public void setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+ this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+ }
+
// Checkpointing
public boolean isChainingEnabled() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c289fac..bd6a4b5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -123,6 +123,12 @@ public class StreamGraphGenerator {
private String jobName = DEFAULT_JOB_NAME;
+ /**
+ * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+ * specified, translate these edges into {@code BLOCKING} result partition type.
+ */
+ private boolean blockingConnectionsBetweenChains = false;
+
// This is used to assign a unique ID to iteration source/sink
protected static Integer iterationIdCounter = 0;
public static int getNewIterationNodeId() {
@@ -182,6 +188,11 @@ public class StreamGraphGenerator {
return this;
}
+ public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+ this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+ return this;
+ }
+
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
streamGraph.setStateBackend(stateBackend);
@@ -190,6 +201,7 @@ public class StreamGraphGenerator {
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
+ streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
alreadyTransformed = new HashMap<>();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c7f36b9..8306da6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -509,12 +509,15 @@ public class StreamingJobGraphGenerator {
ResultPartitionType resultPartitionType;
switch (edge.getShuffleMode()) {
case PIPELINED:
- case UNDEFINED:
resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
break;
case BATCH:
resultPartitionType = ResultPartitionType.BLOCKING;
break;
+ case UNDEFINED:
+ resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ?
+ ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
+ break;
default:
throw new UnsupportedOperationException("Data exchange mode " +
edge.getShuffleMode() + " is not supported yet.");
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 728096b..0134d1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -600,4 +600,77 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
assertEquals(ScheduleMode.LAZY_FROM_SOURCES, jobGraph.getScheduleMode());
}
+
+ /**
+ * Verify that "blockingConnectionsBetweenChains" is off by default.
+ */
+ @Test
+ public void testBlockingAfterChainingOffDisabled() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // fromElements -> Filter -> Print
+ DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+ // partition transformation with an undefined shuffle mode between source and filter
+ DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+ sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+ DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+ DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+ filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+
+ partitionAfterFilterDataStream.print().setParallelism(2);
+
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ assertEquals(2, verticesSorted.size());
+
+ JobVertex sourceVertex = verticesSorted.get(0);
+ JobVertex filterAndPrintVertex = verticesSorted.get(1);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
+ filterAndPrintVertex.getInputs().get(0).getSource().getResultType());
+ }
+
+ /**
+ * Test enabling the property "blockingConnectionsBetweenChains".
+ */
+ @Test
+ public void testBlockingConnectionsBetweenChainsEnabled() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // fromElements -> Filter -> Map -> Print
+ DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+ // partition transformation with an undefined shuffle mode between source and filter
+ DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+ sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+ DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+ DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+ filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+ partitionAfterFilterDataStream.map(value -> value).setParallelism(2);
+
+ DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+ filterDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.PIPELINED));
+ partitionAfterMapDataStream.print().setParallelism(1);
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setBlockingConnectionsBetweenChains(true);
+ JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ assertEquals(3, verticesSorted.size());
+
+ JobVertex sourceVertex = verticesSorted.get(0);
+ // still can be chained
+ JobVertex filterAndMapVertex = verticesSorted.get(1);
+ JobVertex printVertex = verticesSorted.get(2);
+
+ // the edge with undefined shuffle mode is translated into BLOCKING
+ assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType());
+ // the edge with PIPELINED shuffle mode is translated into PIPELINED_BOUNDED
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, filterAndMapVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, printVertex.getInputs().get(0).getSource().getResultType());
+ }
}