You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/17 19:26:50 UTC
flink git commit: [FLINK-9397] [DataStream API] Correctly propagate
operator buffer timeout of 0
Repository: flink
Updated Branches:
refs/heads/release-1.5 27061d35a -> 884c2e39b
[FLINK-9397] [DataStream API] Correctly propagate operator buffer timeout of 0
This also improves some JavaDocs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/884c2e39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/884c2e39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/884c2e39
Branch: refs/heads/release-1.5
Commit: 884c2e39b401fc4f1e0623e856008af53ed5f98e
Parents: 27061d3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 17 19:49:16 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 20:33:03 2018 +0200
----------------------------------------------------------------------
.../datastream/SingleOutputStreamOperator.java | 14 ++++++-
.../api/graph/StreamGraphGenerator.java | 2 +-
.../transformations/StreamTransformation.java | 16 ++++---
.../api/graph/StreamGraphGeneratorTest.java | 44 ++++++++++++++++++++
4 files changed, 68 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 7885934..1ca3ece 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
import java.util.Map;
import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
/**
* {@code SingleOutputStreamOperator} represents a user defined transformation
@@ -226,14 +227,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
}
/**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffer. By default the output buffers flush only when they are full.
+ * Sets the buffering timeout for data produced by this operation.
+ * The timeout defines how long data may linger in a partially full buffer
+ * before being sent over the network.
+ *
+ * <p>Lower timeouts lead to lower tail latencies, but may affect throughput.
+ * Timeouts of 1 ms still sustain high throughput, even for jobs with high parallelism.
+ *
+ * <p>A value of '-1' means that the default buffer timeout should be used. A value
+ * of '0' indicates that no buffering should happen, and all records/events should be
+ * immediately sent through the network, without additional buffering.
*
* @param timeoutMillis
* The maximum time between two output flushes.
* @return The operator with buffer timeout set.
*/
public SingleOutputStreamOperator<T> setBufferTimeout(long timeoutMillis) {
+ checkArgument(timeoutMillis >= -1, "timeout must be >= -1");
transformation.setBufferTimeout(timeoutMillis);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7d0333f..11a7002 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -194,7 +194,7 @@ public class StreamGraphGenerator {
alreadyTransformed.put(transform, transformedIds);
}
- if (transform.getBufferTimeout() > 0) {
+ if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 8cc4db9..1f763bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.Preconditions;
import java.util.Collection;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -394,13 +395,18 @@ public abstract class StreamTransformation<T> {
public abstract void setChainingStrategy(ChainingStrategy strategy);
/**
- * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
- * sending elements over the network. The timeout specifies how long a network buffer
- * should be kept waiting before sending. A higher timeout means that more elements will
- * be sent in one buffer, this increases throughput. The latency, however, is negatively
- * affected by a higher timeout.
+ * Set the buffer timeout of this {@code StreamTransformation}. The timeout defines how long data
+ * may linger in a partially full buffer before being sent over the network.
+ *
+ * <p>Lower timeouts lead to lower tail latencies, but may affect throughput.
+ * For Flink 1.5+, timeouts of 1ms are feasible for jobs with high parallelism.
+ *
+ * <p>A value of -1 means that the default buffer timeout should be used. A value
+ * of zero indicates that no buffering should happen, and all records/events should be
+ * immediately sent through the network, without additional buffering.
*/
public void setBufferTimeout(long bufferTimeout) {
+ checkArgument(bufferTimeout >= -1);
this.bufferTimeout = bufferTimeout;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/884c2e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index d10fb3c..f2a268a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
@@ -58,6 +59,49 @@ import static org.junit.Assert.assertTrue;
*/
public class StreamGraphGeneratorTest {
+ @Test
+ public void testBufferTimeout() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setBufferTimeout(77); // set timeout to some recognizable number
+
+ env
+ .fromElements(1, 2, 3, 4, 5)
+
+ .map(value -> value)
+ .setBufferTimeout(-1)
+ .name("A")
+ .map(value -> value)
+ .setBufferTimeout(0)
+ .name("B")
+ .map(value -> value)
+ .setBufferTimeout(12)
+ .name("C")
+ .map(value -> value)
+ .name("D");
+
+ final StreamGraph sg = env.getStreamGraph();
+ for (StreamNode node : sg.getStreamNodes()) {
+ switch (node.getOperatorName()) {
+
+ case "A":
+ assertEquals(77L, node.getBufferTimeout().longValue());
+ break;
+ case "B":
+ assertEquals(0L, node.getBufferTimeout().longValue());
+ break;
+ case "C":
+ assertEquals(12L, node.getBufferTimeout().longValue());
+ break;
+ case "D":
+ assertEquals(77L, node.getBufferTimeout().longValue());
+ break;
+ default:
+ assertTrue(node.getOperator() instanceof StreamSource);
+ }
+ }
+ }
+
/**
* This tests whether virtual Transformations behave correctly.
*