You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/09 08:09:58 UTC

[flink] 02/02: [FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e27744c94c59f2bcc661b3f94f311221c021773
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Thu Jul 4 20:18:04 2019 +0800

    [FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph
---
 .../flink/streaming/api/graph/StreamGraph.java     | 22 +++++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 12 ++++
 .../api/graph/StreamingJobGraphGenerator.java      |  5 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 73 ++++++++++++++++++++++
 4 files changed, 111 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ffacc94..16f04aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -97,6 +97,12 @@ public class StreamGraph extends StreamingPlan {
 
 	private TimeCharacteristic timeCharacteristic;
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	private boolean blockingConnectionsBetweenChains;
+
 	private Map<Integer, StreamNode> streamNodes;
 	private Set<Integer> sources;
 	private Set<Integer> sinks;
@@ -184,6 +190,22 @@ public class StreamGraph extends StreamingPlan {
 		this.timeCharacteristic = timeCharacteristic;
 	}
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	public boolean isBlockingConnectionsBetweenChains() {
+		return blockingConnectionsBetweenChains;
+	}
+
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	public void setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+		this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+	}
+
 	// Checkpointing
 
 	public boolean isChainingEnabled() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c289fac..bd6a4b5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -123,6 +123,12 @@ public class StreamGraphGenerator {
 
 	private String jobName = DEFAULT_JOB_NAME;
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	private boolean blockingConnectionsBetweenChains = false;
+
 	// This is used to assign a unique ID to iteration source/sink
 	protected static Integer iterationIdCounter = 0;
 	public static int getNewIterationNodeId() {
@@ -182,6 +188,11 @@ public class StreamGraphGenerator {
 		return this;
 	}
 
+	public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+		this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+		return this;
+	}
+
 	public StreamGraph generate() {
 		streamGraph = new StreamGraph(executionConfig, checkpointConfig);
 		streamGraph.setStateBackend(stateBackend);
@@ -190,6 +201,7 @@ public class StreamGraphGenerator {
 		streamGraph.setUserArtifacts(userArtifacts);
 		streamGraph.setTimeCharacteristic(timeCharacteristic);
 		streamGraph.setJobName(jobName);
+		streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
 
 		alreadyTransformed = new HashMap<>();
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c7f36b9..8306da6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -509,12 +509,15 @@ public class StreamingJobGraphGenerator {
 		ResultPartitionType resultPartitionType;
 		switch (edge.getShuffleMode()) {
 			case PIPELINED:
-			case UNDEFINED:
 				resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
 				break;
 			case BATCH:
 				resultPartitionType = ResultPartitionType.BLOCKING;
 				break;
+			case UNDEFINED:
+				resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ?
+						ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
+				break;
 			default:
 				throw new UnsupportedOperationException("Data exchange mode " +
 					edge.getShuffleMode() + " is not supported yet.");
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 728096b..0134d1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -600,4 +600,77 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 		assertEquals(ScheduleMode.LAZY_FROM_SOURCES, jobGraph.getScheduleMode());
 	}
+
+	/**
+	 * Verify that "blockingConnectionsBetweenChains" is off by default.
+	 */
+	@Test
+	public void testBlockingAfterChainingOffDisabled() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Filter -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		// partition transformation with an undefined shuffle mode between source and filter
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+		DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+		DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+
+		partitionAfterFilterDataStream.print().setParallelism(2);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(2, verticesSorted.size());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex filterAndPrintVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
+				filterAndPrintVertex.getInputs().get(0).getSource().getResultType());
+	}
+
+	/**
+	 * Test enabling the property "blockingConnectionsBetweenChains".
+	 */
+	@Test
+	public void testBlockingConnectionsBetweenChainsEnabled() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Filter -> Map -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		// partition transformation with an undefined shuffle mode between source and filter
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+		DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+		DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+		partitionAfterFilterDataStream.map(value -> value).setParallelism(2);
+
+		DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.PIPELINED));
+		partitionAfterMapDataStream.print().setParallelism(1);
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setBlockingConnectionsBetweenChains(true);
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(3, verticesSorted.size());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		// still can be chained
+		JobVertex filterAndMapVertex = verticesSorted.get(1);
+		JobVertex printVertex = verticesSorted.get(2);
+
+		// the edge with undefined shuffle mode is translated into BLOCKING
+		assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType());
+		// the edge with PIPELINED shuffle mode is translated into PIPELINED_BOUNDED
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, filterAndMapVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, printVertex.getInputs().get(0).getSource().getResultType());
+	}
 }