You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/07 15:02:14 UTC

[flink] branch master updated (82f6f86 -> 13e0b35)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 82f6f86  [FLINK-19148][docs] Fix crashed table in Flink Table API & SQL docs
     new 1f2d26f  [hotfix][datastream] Fix the formatting of StreamEdge class
     new 13e0b35  [FLINK-18832][datastream] Add compatible check for blocking partition with buffer timeout

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../environment/StreamExecutionEnvironment.java    |  6 +-
 .../flink/streaming/api/graph/StreamConfig.java    | 29 --------
 .../flink/streaming/api/graph/StreamEdge.java      | 80 +++++++++++++++++-----
 .../streaming/api/graph/StreamGraphGenerator.java  |  5 +-
 .../api/graph/StreamingJobGraphGenerator.java      | 23 ++++++-
 .../api/operators/StreamingRuntimeContext.java     | 10 ---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  4 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 26 +++++++
 .../runtime/tasks/StreamConfigChainer.java         |  4 --
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  2 -
 .../runtime/tasks/StreamTaskTestHarness.java       |  1 -
 11 files changed, 114 insertions(+), 76 deletions(-)


[flink] 01/02: [hotfix][datastream] Fix the formatting of StreamEdge class

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f2d26fab78fc9f3512a54237cc3b7a072af6358
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Aug 20 09:28:52 2020 +0200

    [hotfix][datastream] Fix the formatting of StreamEdge class
---
 .../flink/streaming/api/graph/StreamEdge.java      | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 2ee4a7c..fa26dc5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -76,20 +76,33 @@ public class StreamEdge implements Serializable {
 
 	private final ShuffleMode shuffleMode;
 
-	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
-			List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag) {
-		this(sourceVertex,
-				targetVertex,
-				typeNumber,
-				selectedNames,
-				outputPartitioner,
-				outputTag,
-				ShuffleMode.UNDEFINED);
+	public StreamEdge(
+		StreamNode sourceVertex,
+		StreamNode targetVertex,
+		int typeNumber,
+		List<String> selectedNames,
+		StreamPartitioner<?> outputPartitioner,
+		OutputTag outputTag) {
+
+		this(
+			sourceVertex,
+			targetVertex,
+			typeNumber,
+			selectedNames,
+			outputPartitioner,
+			outputTag,
+			ShuffleMode.UNDEFINED);
 	}
 
-	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
-			List<String> selectedNames, StreamPartitioner<?> outputPartitioner, OutputTag outputTag,
-			ShuffleMode shuffleMode) {
+	public StreamEdge(
+		StreamNode sourceVertex,
+		StreamNode targetVertex,
+		int typeNumber,
+		List<String> selectedNames,
+		StreamPartitioner<?> outputPartitioner,
+		OutputTag outputTag,
+		ShuffleMode shuffleMode) {
+
 		this.sourceId = sourceVertex.getId();
 		this.targetId = targetVertex.getId();
 		this.typeNumber = typeNumber;
@@ -99,9 +112,7 @@ public class StreamEdge implements Serializable {
 		this.sourceOperatorName = sourceVertex.getOperatorName();
 		this.targetOperatorName = targetVertex.getOperatorName();
 		this.shuffleMode = checkNotNull(shuffleMode);
-
-		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
-				+ "_" + outputPartitioner;
+		this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;
 	}
 
 	public int getSourceId() {


[flink] 02/02: [FLINK-18832][datastream] Add compatible check for blocking partition with buffer timeout

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 13e0b355d1d9ba513671de1638d3c35edb6b96a3
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Mon Sep 7 05:55:48 2020 +0200

    [FLINK-18832][datastream] Add compatible check for blocking partition with buffer timeout
    
    From the requirement it is no need to enable buffer timeout for batch jobs since the downstream can only consume data when the upstream finishes.
    Furthermore the current implementation of BoundedBlockingSubpartition does not consider the concurrent issue from the flusher thread by enabling
    buffer timeout. So it is nice to check this compatibility during job graph generation in advance and give a friendly message hint for users.
    
    This closes #13209.
---
 .../environment/StreamExecutionEnvironment.java    |  6 ++--
 .../flink/streaming/api/graph/StreamConfig.java    | 29 ----------------
 .../flink/streaming/api/graph/StreamEdge.java      | 39 +++++++++++++++++++++-
 .../streaming/api/graph/StreamGraphGenerator.java  |  5 +--
 .../api/graph/StreamingJobGraphGenerator.java      | 23 +++++++++++--
 .../api/operators/StreamingRuntimeContext.java     | 10 ------
 .../flink/streaming/runtime/tasks/StreamTask.java  |  4 +--
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 26 +++++++++++++++
 .../runtime/tasks/StreamConfigChainer.java         |  4 ---
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  2 --
 .../runtime/tasks/StreamTaskTestHarness.java       |  1 -
 11 files changed, 88 insertions(+), 61 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9746f13..78b3e54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -83,6 +83,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.ExceptionUtils;
@@ -128,9 +129,6 @@ public class StreamExecutionEnvironment {
 	/** The time characteristic that is used if none other is set. */
 	private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
 
-	/** The default buffer timeout (max delay of records in the network stack). */
-	private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
-
 	/**
 	 * The environment of the context (local by default, cluster if invoked through command line).
 	 */
@@ -152,7 +150,7 @@ public class StreamExecutionEnvironment {
 
 	protected final List<Transformation<?>> transformations = new ArrayList<>();
 
-	private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+	private long bufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
 
 	protected boolean isChainingEnabled = true;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index b5f407b..0a23523 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -75,14 +75,12 @@ public class StreamConfig implements Serializable {
 	private static final String VERTEX_NAME = "vertexID";
 	private static final String ITERATION_ID = "iterationId";
 	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
-	private static final String BUFFER_TIMEOUT = "bufferTimeout";
 	private static final String INPUTS = "inputs";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
 	private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_";
 	private static final String ITERATON_WAIT = "iterationWait";
 	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
-	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
 	private static final String OPERATOR_NAME = "operatorName";
 	private static final String OPERATOR_ID = "operatorID";
@@ -104,7 +102,6 @@ public class StreamConfig implements Serializable {
 	//  Default Values
 	// ------------------------------------------------------------------------
 
-	private static final long DEFAULT_TIMEOUT = 100;
 	private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
 
 	private static final double DEFAULT_MANAGED_MEMORY_FRACTION = 0.0;
@@ -238,14 +235,6 @@ public class StreamConfig implements Serializable {
 		return (TypeSerializer<T>) ((NetworkInputConfig) inputs[index]).typeSerializer;
 	}
 
-	public void setBufferTimeout(long timeout) {
-		config.setLong(BUFFER_TIMEOUT, timeout);
-	}
-
-	public long getBufferTimeout() {
-		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
-	}
-
 	@VisibleForTesting
 	public void setStreamOperator(StreamOperator<?> operator) {
 		setStreamOperatorFactory(SimpleOperatorFactory.of(operator));
@@ -374,23 +363,6 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setOutEdges(List<StreamEdge> outEdges) {
-		try {
-			InstantiationUtil.writeObjectToConfig(outEdges, this.config, OUT_STREAM_EDGES);
-		} catch (IOException e) {
-			throw new StreamTaskException("Cannot serialize outward edges.", e);
-		}
-	}
-
-	public List<StreamEdge> getOutEdges(ClassLoader cl) {
-		try {
-			List<StreamEdge> outEdges = InstantiationUtil.readObjectFromConfig(this.config, OUT_STREAM_EDGES, cl);
-			return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
-		} catch (Exception e) {
-			throw new StreamTaskException("Could not instantiate outputs.", e);
-		}
-	}
-
 	public void setInPhysicalEdges(List<StreamEdge> inEdges) {
 		try {
 			InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES);
@@ -612,7 +584,6 @@ public class StreamConfig implements Serializable {
 		catch (Exception e) {
 			builder.append("\nOperator: Missing");
 		}
-		builder.append("\nBuffer timeout: ").append(getBufferTimeout());
 		builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
 		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
 			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index fa26dc5..70232ce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -38,6 +39,8 @@ public class StreamEdge implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
+	private static final long ALWAYS_FLUSH_BUFFER_TIMEOUT = 0L;
+
 	private final String edgeId;
 
 	private final int sourceId;
@@ -76,6 +79,8 @@ public class StreamEdge implements Serializable {
 
 	private final ShuffleMode shuffleMode;
 
+	private long bufferTimeout;
+
 	public StreamEdge(
 		StreamNode sourceVertex,
 		StreamNode targetVertex,
@@ -88,6 +93,7 @@ public class StreamEdge implements Serializable {
 			sourceVertex,
 			targetVertex,
 			typeNumber,
+			ALWAYS_FLUSH_BUFFER_TIMEOUT,
 			selectedNames,
 			outputPartitioner,
 			outputTag,
@@ -103,9 +109,31 @@ public class StreamEdge implements Serializable {
 		OutputTag outputTag,
 		ShuffleMode shuffleMode) {
 
+		this(
+			sourceVertex,
+			targetVertex,
+			typeNumber,
+			sourceVertex.getBufferTimeout(),
+			selectedNames,
+			outputPartitioner,
+			outputTag,
+			shuffleMode);
+	}
+
+	public StreamEdge(
+		StreamNode sourceVertex,
+		StreamNode targetVertex,
+		int typeNumber,
+		long bufferTimeout,
+		List<String> selectedNames,
+		StreamPartitioner<?> outputPartitioner,
+		OutputTag outputTag,
+		ShuffleMode shuffleMode) {
+
 		this.sourceId = sourceVertex.getId();
 		this.targetId = targetVertex.getId();
 		this.typeNumber = typeNumber;
+		this.bufferTimeout = bufferTimeout;
 		this.selectedNames = selectedNames;
 		this.outputPartitioner = outputPartitioner;
 		this.outputTag = outputTag;
@@ -147,6 +175,15 @@ public class StreamEdge implements Serializable {
 		this.outputPartitioner = partitioner;
 	}
 
+	public void setBufferTimeout(long bufferTimeout) {
+		checkArgument(bufferTimeout >= -1);
+		this.bufferTimeout = bufferTimeout;
+	}
+
+	public long getBufferTimeout() {
+		return bufferTimeout;
+	}
+
 	@Override
 	public int hashCode() {
 		return Objects.hash(edgeId, outputTag);
@@ -170,6 +207,6 @@ public class StreamEdge implements Serializable {
 	public String toString() {
 		return "(" + (sourceOperatorName + "-" + sourceId) + " -> " + (targetOperatorName + "-" + targetId)
 			+ ", typeNumber=" + typeNumber + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
-			+ ", outputTag=" + outputTag + ')';
+			+ ", bufferTimeout=" + bufferTimeout + ", outputTag=" + outputTag + ')';
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index c9060b9..0f6c0e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -104,9 +104,6 @@ public class StreamGraphGenerator {
 
 	public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
 
-	/** The default buffer timeout (max delay of records in the network stack). */
-	public static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
-
 	public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
 
 	private final List<Transformation<?>> transformations;
@@ -127,7 +124,7 @@ public class StreamGraphGenerator {
 
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
-	private long defaultBufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+	private long defaultBufferTimeout = StreamingJobGraphGenerator.UNDEFINED_NETWORK_BUFFER_TIMEOUT;
 
 	private String jobName = DEFAULT_JOB_NAME;
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index cda60fc..bf3d4b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,6 +99,10 @@ public class StreamingJobGraphGenerator {
 
 	private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
 
+	private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
+
+	public static final long UNDEFINED_NETWORK_BUFFER_TIMEOUT = -1L;
+
 	// ------------------------------------------------------------------------
 
 	public static JobGraph createJobGraph(StreamGraph streamGraph) {
@@ -318,13 +322,12 @@ public class StreamingJobGraphGenerator {
 				config.setChainStart();
 				config.setChainIndex(0);
 				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
-				config.setOutEdgesInOrder(transitiveOutEdges);
-				config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
 
 				for (StreamEdge edge : transitiveOutEdges) {
 					connect(startNodeId, edge);
 				}
 
+				config.setOutEdgesInOrder(transitiveOutEdges);
 				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
 
 			} else {
@@ -469,7 +472,6 @@ public class StreamingJobGraphGenerator {
 		StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
 		config.setVertexID(vertexID);
-		config.setBufferTimeout(vertex.getBufferTimeout());
 
 		config.setTypeSerializersIn(vertex.getTypeSerializersIn());
 		config.setTypeSerializerOut(vertex.getTypeSerializerOut());
@@ -570,6 +572,8 @@ public class StreamingJobGraphGenerator {
 					edge.getShuffleMode() + " is not supported yet.");
 		}
 
+		checkAndResetBufferTimeout(resultPartitionType, edge);
+
 		JobEdge jobEdge;
 		if (isPointwisePartitioner(partitioner)) {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
@@ -591,6 +595,19 @@ public class StreamingJobGraphGenerator {
 		}
 	}
 
+	private void checkAndResetBufferTimeout(ResultPartitionType type, StreamEdge edge) {
+		long bufferTimeout = edge.getBufferTimeout();
+		if (type.isBlocking() && bufferTimeout != UNDEFINED_NETWORK_BUFFER_TIMEOUT) {
+			throw new UnsupportedOperationException(
+				"Blocking partition does not support buffer timeout " + bufferTimeout + " for src operator in edge "
+					+ edge.toString() + ". \nPlease either reset buffer timeout as -1 or use the non-blocking partition.");
+		}
+
+		if (type.isPipelined() && bufferTimeout == UNDEFINED_NETWORK_BUFFER_TIMEOUT) {
+			edge.setBufferTimeout(DEFAULT_NETWORK_BUFFER_TIMEOUT);
+		}
+	}
+
 	private static boolean isPointwisePartitioner(StreamPartitioner<?> partitioner) {
 		return partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 8f76a75..ba88570 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -243,14 +243,4 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 	public CheckpointingMode getCheckpointMode() {
 		return streamConfig.getCheckpointMode();
 	}
-
-	/**
-	 * Returns the buffer timeout of the job.
-	 *
-	 * @return buffer timeout (in milliseconds)
-	 */
-	public long getBufferTimeout() {
-		return streamConfig.getBufferTimeout();
-	}
-
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3667027..80224b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -89,7 +89,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -1145,7 +1144,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			Environment environment) {
 		List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
 		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
-		Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
 
 		for (int i = 0; i < outEdgesInOrder.size(); i++) {
 			StreamEdge edge = outEdgesInOrder.get(i);
@@ -1155,7 +1153,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					i,
 					environment,
 					environment.getTaskInfo().getTaskName(),
-					chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
+					edge.getBufferTimeout()));
 		}
 		return recordWriters;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index c947325..a389d77 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -609,6 +609,32 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 			sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
 	}
 
+	@Test(expected = UnsupportedOperationException.class)
+	public void testConflictShuffleModeWithBufferTimeout() {
+		testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
+	}
+
+	@Test
+	public void testNormalShuffleModeWithBufferTimeout() {
+		testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
+	}
+
+	private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode shuffleMode) {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setBufferTimeout(100);
+
+		DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
+		PartitionTransformation<Integer> transformation = new PartitionTransformation<>(
+			sourceDataStream.getTransformation(),
+			new RebalancePartitioner<>(),
+			shuffleMode);
+
+		DataStream<Integer> partitionStream = new DataStream<>(env, transformation);
+		partitionStream.map(value -> value).print();
+
+		StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+	}
+
 	/**
 	 * Test iteration job, check slot sharing group and co-location group.
 	 */
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 4309d67..6164163 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -49,7 +49,6 @@ public class StreamConfigChainer<OWNER> {
 	private final OWNER owner;
 	private final StreamConfig headConfig;
 	private final Map<Integer, StreamConfig> chainedConfigs = new HashMap<>();
-	private final long bufferTimeout;
 
 	private StreamConfig tailConfig;
 	private int chainIndex = MAIN_NODE_ID;
@@ -58,7 +57,6 @@ public class StreamConfigChainer<OWNER> {
 		this.owner = checkNotNull(owner);
 		this.headConfig = checkNotNull(headConfig);
 		this.tailConfig = checkNotNull(headConfig);
-		this.bufferTimeout = headConfig.getBufferTimeout();
 
 		head(headOperatorID);
 	}
@@ -67,7 +65,6 @@ public class StreamConfigChainer<OWNER> {
 		headConfig.setOperatorID(headOperatorID);
 		headConfig.setChainStart();
 		headConfig.setChainIndex(chainIndex);
-		headConfig.setBufferTimeout(bufferTimeout);
 	}
 
 	public <T> StreamConfigChainer<OWNER> chain(
@@ -145,7 +142,6 @@ public class StreamConfigChainer<OWNER> {
 			tailConfig.setStateKeySerializer(inputSerializer);
 		}
 		tailConfig.setChainIndex(chainIndex);
-		tailConfig.setBufferTimeout(bufferTimeout);
 
 		chainedConfigs.put(chainIndex, tailConfig);
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 1a88be8..0ed1a41 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -73,7 +73,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 
 	protected long memorySize = 1024 * 1024;
 	protected int bufferSize = 1024;
-	protected long bufferTimeout = 0;
 	protected Configuration jobConfig = new Configuration();
 	protected Configuration taskConfig = new Configuration();
 	protected StreamConfig streamConfig = new StreamConfig(taskConfig);
@@ -125,7 +124,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	}
 
 	public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
-		streamConfig.setBufferTimeout(bufferTimeout);
 
 		TestTaskStateManager taskStateManager = new TestTaskStateManager(localRecoveryConfig);
 		if (taskStateSnapshots != null) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 6ad42bcb..e0ed885 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -149,7 +149,6 @@ public class StreamTaskTestHarness<OUT> {
 		this.executionConfig = new ExecutionConfig();
 
 		streamConfig = new StreamConfig(taskConfig);
-		streamConfig.setBufferTimeout(0);
 
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new StreamElementSerializer<>(outputSerializer);