You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/17 19:26:50 UTC

flink git commit: [FLINK-9397] [DataStream API] Correctly propagate operator buffer timeout of 0

Repository: flink
Updated Branches:
  refs/heads/release-1.5 27061d35a -> 884c2e39b


[FLINK-9397] [DataStream API] Correctly propagate operator buffer timeout of 0

This also improves some JavaDocs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884c2e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884c2e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884c2e39

Branch: refs/heads/release-1.5
Commit: 884c2e39b401fc4f1e0623e856008af53ed5f98e
Parents: 27061d3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 19:49:16 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 20:33:03 2018 +0200

----------------------------------------------------------------------
 .../datastream/SingleOutputStreamOperator.java  | 14 ++++++-
 .../api/graph/StreamGraphGenerator.java         |  2 +-
 .../transformations/StreamTransformation.java   | 16 ++++---
 .../api/graph/StreamGraphGeneratorTest.java     | 44 ++++++++++++++++++++
 4 files changed, 68 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 7885934..1ca3ece 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * {@code SingleOutputStreamOperator} represents a user defined transformation
@@ -226,14 +227,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 	}
 
 	/**
-	 * Sets the maximum time frequency (ms) for the flushing of the output
-	 * buffer. By default the output buffers flush only when they are full.
+	 * Sets the buffering timeout for data produced by this operation.
+	 * The timeout defines how long data may linger in a partially full buffer
+	 * before being sent over the network.
+	 *
+	 * <p>Lower timeouts lead to lower tail latencies, but may affect throughput.
+	 * Timeouts of 1 ms still sustain high throughput, even for jobs with high parallelism.
+	 *
+	 * <p>A value of '-1' means that the default buffer timeout should be used. A value
+	 * of '0' indicates that no buffering should happen, and all records/events should be
+	 * immediately sent through the network, without additional buffering.
 	 *
 	 * @param timeoutMillis
 	 *            The maximum time between two output flushes.
 	 * @return The operator with buffer timeout set.
 	 */
 	public SingleOutputStreamOperator<T> setBufferTimeout(long timeoutMillis) {
+		checkArgument(timeoutMillis >= -1, "timeout must be >= -1");
 		transformation.setBufferTimeout(timeoutMillis);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7d0333f..11a7002 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -194,7 +194,7 @@ public class StreamGraphGenerator {
 			alreadyTransformed.put(transform, transformedIds);
 		}
 
-		if (transform.getBufferTimeout() > 0) {
+		if (transform.getBufferTimeout() >= 0) {
 			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
 		}
 		if (transform.getUid() != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 8cc4db9..1f763bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -394,13 +395,18 @@ public abstract class StreamTransformation<T> {
 	public abstract void setChainingStrategy(ChainingStrategy strategy);
 
 	/**
-	 * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
-	 * sending elements over the network. The timeout specifies how long a network buffer
-	 * should be kept waiting before sending. A higher timeout means that more elements will
-	 * be sent in one buffer, this increases throughput. The latency, however, is negatively
-	 * affected by a higher timeout.
+	 * Set the buffer timeout of this {@code StreamTransformation}. The timeout defines how long data
+	 * may linger in a partially full buffer before being sent over the network.
+	 *
+	 * <p>Lower timeouts lead to lower tail latencies, but may affect throughput.
+	 * For Flink 1.5+, timeouts of 1ms are feasible for jobs with high parallelism.
+	 *
+	 * <p>A value of -1 means that the default buffer timeout should be used. A value
+	 * of zero indicates that no buffering should happen, and all records/events should be
+	 * immediately sent through the network, without additional buffering.
 	 */
 	public void setBufferTimeout(long bufferTimeout) {
+		checkArgument(bufferTimeout >= -1);
 		this.bufferTimeout = bufferTimeout;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index d10fb3c..f2a268a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
@@ -58,6 +59,49 @@ import static org.junit.Assert.assertTrue;
  */
 public class StreamGraphGeneratorTest {
 
+	@Test
+	public void testBufferTimeout() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setBufferTimeout(77); // set timeout to some recognizable number
+
+		env
+			.fromElements(1, 2, 3, 4, 5)
+
+			.map(value -> value)
+				.setBufferTimeout(-1)
+				.name("A")
+			.map(value -> value)
+				.setBufferTimeout(0)
+				.name("B")
+			.map(value -> value)
+				.setBufferTimeout(12)
+				.name("C")
+			.map(value -> value)
+				.name("D");
+
+		final StreamGraph sg = env.getStreamGraph();
+		for (StreamNode node : sg.getStreamNodes()) {
+			switch (node.getOperatorName()) {
+
+				case "A":
+					assertEquals(77L, node.getBufferTimeout().longValue());
+					break;
+				case "B":
+					assertEquals(0L, node.getBufferTimeout().longValue());
+					break;
+				case "C":
+					assertEquals(12L, node.getBufferTimeout().longValue());
+					break;
+				case "D":
+					assertEquals(77L, node.getBufferTimeout().longValue());
+					break;
+				default:
+					assertTrue(node.getOperator() instanceof StreamSource);
+			}
+		}
+	}
+
 	/**
 	 * This tests whether virtual Transformations behave correctly.
 	 *