You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/09 08:09:56 UTC

[flink] branch master updated (8f47b38 -> 5e27744)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8f47b38  [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
     new 47d7eaa  [FLINK-13098][datastream] Add a new type UNDEFINED of shuffle mode
     new 5e27744  [FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/streaming/api/graph/StreamEdge.java      |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  24 ++-
 .../streaming/api/graph/StreamGraphGenerator.java  |  12 ++
 .../api/graph/StreamingJobGraphGenerator.java      |   4 +
 .../transformations/PartitionTransformation.java   |   2 +-
 .../streaming/api/transformations/ShuffleMode.java |   9 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 163 +++++++++++++++++++--
 7 files changed, 200 insertions(+), 16 deletions(-)


[flink] 02/02: [FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e27744c94c59f2bcc661b3f94f311221c021773
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Thu Jul 4 20:18:04 2019 +0800

    [FLINK-13101][datastream] Introduce blockingConnectionsBetweenChains property of StreamGraph
---
 .../flink/streaming/api/graph/StreamGraph.java     | 22 +++++++
 .../streaming/api/graph/StreamGraphGenerator.java  | 12 ++++
 .../api/graph/StreamingJobGraphGenerator.java      |  5 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 73 ++++++++++++++++++++++
 4 files changed, 111 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index ffacc94..16f04aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -97,6 +97,12 @@ public class StreamGraph extends StreamingPlan {
 
 	private TimeCharacteristic timeCharacteristic;
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	private boolean blockingConnectionsBetweenChains;
+
 	private Map<Integer, StreamNode> streamNodes;
 	private Set<Integer> sources;
 	private Set<Integer> sinks;
@@ -184,6 +190,22 @@ public class StreamGraph extends StreamingPlan {
 		this.timeCharacteristic = timeCharacteristic;
 	}
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	public boolean isBlockingConnectionsBetweenChains() {
+		return blockingConnectionsBetweenChains;
+	}
+
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	public void setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+		this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+	}
+
 	// Checkpointing
 
 	public boolean isChainingEnabled() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c289fac..bd6a4b5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -123,6 +123,12 @@ public class StreamGraphGenerator {
 
 	private String jobName = DEFAULT_JOB_NAME;
 
+	/**
+	 * If there are some stream edges that can not be chained and the shuffle mode of edge is not
+	 * specified, translate these edges into {@code BLOCKING} result partition type.
+	 */
+	private boolean blockingConnectionsBetweenChains = false;
+
 	// This is used to assign a unique ID to iteration source/sink
 	protected static Integer iterationIdCounter = 0;
 	public static int getNewIterationNodeId() {
@@ -182,6 +188,11 @@ public class StreamGraphGenerator {
 		return this;
 	}
 
+	public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) {
+		this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains;
+		return this;
+	}
+
 	public StreamGraph generate() {
 		streamGraph = new StreamGraph(executionConfig, checkpointConfig);
 		streamGraph.setStateBackend(stateBackend);
@@ -190,6 +201,7 @@ public class StreamGraphGenerator {
 		streamGraph.setUserArtifacts(userArtifacts);
 		streamGraph.setTimeCharacteristic(timeCharacteristic);
 		streamGraph.setJobName(jobName);
+		streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
 
 		alreadyTransformed = new HashMap<>();
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c7f36b9..8306da6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -509,12 +509,15 @@ public class StreamingJobGraphGenerator {
 		ResultPartitionType resultPartitionType;
 		switch (edge.getShuffleMode()) {
 			case PIPELINED:
-			case UNDEFINED:
 				resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
 				break;
 			case BATCH:
 				resultPartitionType = ResultPartitionType.BLOCKING;
 				break;
+			case UNDEFINED:
+				resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ?
+						ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
+				break;
 			default:
 				throw new UnsupportedOperationException("Data exchange mode " +
 					edge.getShuffleMode() + " is not supported yet.");
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 728096b..0134d1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -600,4 +600,77 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 		assertEquals(ScheduleMode.LAZY_FROM_SOURCES, jobGraph.getScheduleMode());
 	}
+
+	/**
+	 * Verify that "blockingConnectionsBetweenChains" is off by default.
+	 */
+	@Test
+	public void testBlockingAfterChainingOffDisabled() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Filter -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		// partition transformation with an undefined shuffle mode between source and filter
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+		DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+		DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+
+		partitionAfterFilterDataStream.print().setParallelism(2);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(2, verticesSorted.size());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex filterAndPrintVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
+				filterAndPrintVertex.getInputs().get(0).getSource().getResultType());
+	}
+
+	/**
+	 * Test enabling the property "blockingConnectionsBetweenChains".
+	 */
+	@Test
+	public void testBlockingConnectionsBetweenChainsEnabled() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Filter -> Map -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		// partition transformation with an undefined shuffle mode between source and filter
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+		DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
+
+		DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+		partitionAfterFilterDataStream.map(value -> value).setParallelism(2);
+
+		DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+			filterDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.PIPELINED));
+		partitionAfterMapDataStream.print().setParallelism(1);
+
+		StreamGraph streamGraph = env.getStreamGraph();
+		streamGraph.setBlockingConnectionsBetweenChains(true);
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(3, verticesSorted.size());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		// still can be chained
+		JobVertex filterAndMapVertex = verticesSorted.get(1);
+		JobVertex printVertex = verticesSorted.get(2);
+
+		// the edge with undefined shuffle mode is translated into BLOCKING
+		assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType());
+		// the edge with PIPELINED shuffle mode is translated into PIPELINED_BOUNDED
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, filterAndMapVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, printVertex.getInputs().get(0).getSource().getResultType());
+	}
 }


[flink] 01/02: [FLINK-13098][datastream] Add a new type UNDEFINED of shuffle mode

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47d7eaa67f837d51bf0600b2d2d6d0f4202b6c0a
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Thu Jul 4 17:56:18 2019 +0800

    [FLINK-13098][datastream] Add a new type UNDEFINED of shuffle mode
---
 .../flink/streaming/api/graph/StreamEdge.java      |  2 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  2 +-
 .../api/graph/StreamingJobGraphGenerator.java      |  1 +
 .../transformations/PartitionTransformation.java   |  2 +-
 .../streaming/api/transformations/ShuffleMode.java |  9 ++-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 90 +++++++++++++++++++---
 6 files changed, 90 insertions(+), 16 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 48c70d0..05a641a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -83,7 +83,7 @@ public class StreamEdge implements Serializable {
 				selectedNames,
 				outputPartitioner,
 				outputTag,
-				ShuffleMode.PIPELINED);
+				ShuffleMode.UNDEFINED);
 	}
 
 	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 87e6d86..ffacc94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -470,7 +470,7 @@ public class StreamGraph extends StreamingPlan {
 			}
 
 			if (shuffleMode == null) {
-				shuffleMode = ShuffleMode.PIPELINED;
+				shuffleMode = ShuffleMode.UNDEFINED;
 			}
 
 			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 4c11fa3..c7f36b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -509,6 +509,7 @@ public class StreamingJobGraphGenerator {
 		ResultPartitionType resultPartitionType;
 		switch (edge.getShuffleMode()) {
 			case PIPELINED:
+			case UNDEFINED:
 				resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
 				break;
 			case BATCH:
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
index 51988e2..8fe0d8a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -54,7 +54,7 @@ public class PartitionTransformation<T> extends Transformation<T> {
 	 * @param partitioner The {@code StreamPartitioner}
 	 */
 	public PartitionTransformation(Transformation<T> input, StreamPartitioner<T> partitioner) {
-		this(input, partitioner, ShuffleMode.PIPELINED);
+		this(input, partitioner, ShuffleMode.UNDEFINED);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
index 583c57e..91ea643 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
@@ -35,5 +35,12 @@ public enum ShuffleMode {
 	 * The producer first produces its entire result and finishes.
 	 * After that, the consumer is started and may consume the data.
 	 */
-	BATCH
+	BATCH,
+
+	/**
+	 * The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode.
+	 * The framework will pick one of {@link ShuffleMode#BATCH} or {@link ShuffleMode#PIPELINED} in
+	 * the end.
+	 */
+	UNDEFINED
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4e80520..728096b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
@@ -380,31 +381,96 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	}
 
 	/**
-	 * Test manually setting shuffle mode.
+	 * Test setting shuffle mode to {@link ShuffleMode#PIPELINED}.
 	 */
 	@Test
-	public void testShuffleMode() {
+	public void testShuffleModePipelined() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		// fromElements -> Map -> Print, will not chain since the batch data exchange mode
-		DataStream<Integer> mapDataStream = env.fromElements(1, 2, 3)
-			.map((MapFunction<Integer, Integer>) value -> value).setParallelism(2);
+		// fromElements -> Map -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
 
-		DataStream<Integer> partitionDataStream = new DataStream<>(env, new PartitionTransformation<>(
-				mapDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.BATCH));
-		partitionDataStream.print().setParallelism(2);
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				sourceDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.PIPELINED));
+		DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1);
+
+		DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				mapDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.PIPELINED));
+		partitionAfterMapDataStream.print().setParallelism(2);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(2, verticesSorted.size());
+
+		// it can be chained with PIPELINED shuffle mode
+		JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+		// PIPELINED shuffle mode is translated into PIPELINED_BOUNDED result partition
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
+				sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
+	}
+
+	/**
+	 * Test setting shuffle mode to {@link ShuffleMode#BATCH}.
+	 */
+	@Test
+	public void testShuffleModeBatch() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Map -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				sourceDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.BATCH));
+		DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1);
+
+		DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				mapDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.BATCH));
+		partitionAfterMapDataStream.print().setParallelism(2);
 
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
 		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		assertEquals(3, verticesSorted.size());
 
+		// it can not be chained with BATCH shuffle mode
 		JobVertex sourceVertex = verticesSorted.get(0);
 		JobVertex mapVertex = verticesSorted.get(1);
-		JobVertex printVertex = verticesSorted.get(2);
 
-		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
-		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapVertex.getInputs().get(0).getSource().getResultType());
-		assertEquals(ResultPartitionType.BLOCKING, printVertex.getInputs().get(0).getSource().getResultType());
+		// BATCH shuffle mode is translated into BLOCKING result partition
+		assertEquals(ResultPartitionType.BLOCKING,
+			sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.BLOCKING,
+			mapVertex.getProducedDataSets().get(0).getResultType());
+	}
+
+	/**
+	 * Test setting shuffle mode to {@link ShuffleMode#UNDEFINED}.
+	 */
+	@Test
+	public void testShuffleModeUndefined() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		// fromElements -> Map -> Print
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+
+		DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				sourceDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
+		DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1);
+
+		DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>(
+				mapDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED));
+		partitionAfterMapDataStream.print().setParallelism(2);
+
+		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		assertEquals(2, verticesSorted.size());
+
+		// it can be chained with UNDEFINED shuffle mode
+		JobVertex sourceAndMapVertex = verticesSorted.get(0);
+
+		// UNDEFINED shuffle mode is translated into PIPELINED_BOUNDED result partition by default
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
+			sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
 	}
 
 	/**