You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:42 UTC
[6/7] flink git commit: [FLINK-4391] Add timeout parameter for
asynchronous I/O
[FLINK-4391] Add timeout parameter for asynchronous I/O
The timeout defines how long an asynchronous I/O operation can take. If the operation
takes longer than the timeout, then it is failed with an TimeoutException.
Annotate classes with internal Annotation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c5a8711
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c5a8711
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c5a8711
Branch: refs/heads/master
Commit: 6c5a8711d80dfcea20967aea009bac51122d5094
Parents: ad603d5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 20 03:42:25 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:04:51 2016 +0100
----------------------------------------------------------------------
.../examples/async/AsyncIOExample.java | 19 ++-
.../api/datastream/AsyncDataStream.java | 67 +++++++---
.../api/operators/async/AsyncWaitOperator.java | 5 +-
.../streaming/api/operators/async/Emitter.java | 2 +
.../api/operators/async/OperatorActions.java | 2 +
.../async/queue/AsyncCollectionResult.java | 3 +
.../api/operators/async/queue/AsyncResult.java | 2 +
.../async/queue/AsyncWatermarkResult.java | 2 +
.../async/queue/OrderedStreamElementQueue.java | 2 +
.../async/queue/StreamElementQueue.java | 2 +
.../async/queue/StreamElementQueueEntry.java | 2 +
.../async/queue/StreamRecordQueueEntry.java | 2 +
.../queue/UnorderedStreamElementQueue.java | 2 +
.../async/queue/WatermarkQueueEntry.java | 2 +
.../operators/async/AsyncWaitOperatorTest.java | 131 +++++++++++++++++--
.../util/AbstractStreamOperatorTestHarness.java | 42 ++++--
.../util/OneInputStreamOperatorTestHarness.java | 21 +++
.../streaming/api/StreamingOperatorsITCase.java | 17 ++-
18 files changed, 277 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 6dde537..2b05983 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -206,7 +206,8 @@ public class AsyncIOExample {
"[--maxCount <max number of input from source, -1 for infinite input>] " +
"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
- "[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]");
+ "[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" +
+ "[--timeout <Timeout for the asynchronous operations>]");
}
public static void main(String[] args) throws Exception {
@@ -226,6 +227,7 @@ public class AsyncIOExample {
final int taskNum;
final String timeType;
final long shutdownWaitTS;
+ final long timeout;
try {
// check the configuration for the job
@@ -238,6 +240,7 @@ public class AsyncIOExample {
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
+ timeout = params.getLong("timeout", 10000L);
} catch (Exception e) {
printUsage();
@@ -292,10 +295,20 @@ public class AsyncIOExample {
// add async operator to streaming job
DataStream<String> result;
if (ORDERED.equals(mode)) {
- result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
+ result = AsyncDataStream.orderedWait(
+ inputStream,
+ function,
+ timeout,
+ TimeUnit.MILLISECONDS,
+ 20).setParallelism(taskNum);
}
else {
- result = AsyncDataStream.unorderedWait(inputStream, function, 20).setParallelism(taskNum);
+ result = AsyncDataStream.unorderedWait(
+ inputStream,
+ function,
+ timeout,
+ TimeUnit.MILLISECONDS,
+ 20).setParallelism(taskNum);
}
// add a reduce to get the sum of each keys.
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index 4fefde0..8132d28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import java.util.concurrent.TimeUnit;
+
/**
* A helper class to apply {@link AsyncFunction} to a data stream.
* <p>
@@ -31,7 +33,7 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
* DataStream<String> input = ...
* AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
*
- * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);
* }
* </pre>
*/
@@ -40,13 +42,14 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
public class AsyncDataStream {
public enum OutputMode { ORDERED, UNORDERED }
- private static final int DEFAULT_BUFFER_SIZE = 100;
+ private static final int DEFAULT_QUEUE_CAPACITY = 100;
/**
* Add an AsyncWaitOperator.
*
* @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
+ * @param timeout for the asynchronous operation to complete
* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
* @param mode Processing mode for {@link AsyncWaitOperator}.
* @param <IN> Input type.
@@ -56,6 +59,7 @@ public class AsyncDataStream {
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
+ long timeout,
int bufSize,
OutputMode mode) {
@@ -64,8 +68,11 @@ public class AsyncDataStream {
true, in.getType(), Utils.getCallLocationName(), true);
// create transform
- AsyncWaitOperator<IN, OUT> operator =
- new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func), bufSize, mode);
+ AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
+ in.getExecutionEnvironment().clean(func),
+ timeout,
+ bufSize,
+ mode);
return in.transform("async wait operator", outTypeInfo, operator);
}
@@ -75,7 +82,9 @@ public class AsyncDataStream {
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
- * @param bufSize The max number of async i/o operation that can be triggered
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the given timeout
+ * @param capacity The max number of async i/o operation that can be triggered
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
@@ -83,30 +92,44 @@ public class AsyncDataStream {
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
- int bufSize) {
- return addOperator(in, func, bufSize, OutputMode.UNORDERED);
+ long timeout,
+ TimeUnit timeUnit,
+ int capacity) {
+ return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
}
/**
* Add an AsyncWaitOperator. The order of output stream records may be reordered.
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the given timeout
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
DataStream<IN> in,
- AsyncFunction<IN, OUT> func) {
- return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.UNORDERED);
+ AsyncFunction<IN, OUT> func,
+ long timeout,
+ TimeUnit timeUnit) {
+ return addOperator(
+ in,
+ func,
+ timeUnit.toMillis(timeout),
+ DEFAULT_QUEUE_CAPACITY,
+ OutputMode.UNORDERED);
}
/**
- * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+ * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
+ * input ones.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
- * @param bufSize The max number of async i/o operation that can be triggered
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the given timeout
+ * @param capacity The max number of async i/o operation that can be triggered
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
@@ -114,22 +137,34 @@ public class AsyncDataStream {
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
- int bufSize) {
- return addOperator(in, func, bufSize, OutputMode.ORDERED);
+ long timeout,
+ TimeUnit timeUnit,
+ int capacity) {
+ return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}
/**
- * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+ * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
+ * input ones.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
+ * @param timeout for the asynchronous operation to complete
+ * @param timeUnit of the given timeout
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
- AsyncFunction<IN, OUT> func) {
- return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.ORDERED);
+ AsyncFunction<IN, OUT> func,
+ long timeout,
+ TimeUnit timeUnit) {
+ return addOperator(
+ in,
+ func,
+ timeUnit.toMillis(timeout),
+ DEFAULT_QUEUE_CAPACITY,
+ OutputMode.ORDERED);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 88fc833..754b754 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -114,6 +114,7 @@ public class AsyncWaitOperator<IN, OUT>
public AsyncWaitOperator(
AsyncFunction<IN, OUT> asyncFunction,
+ long timeout,
int capacity,
AsyncDataStream.OutputMode outputMode) {
super(asyncFunction);
@@ -124,7 +125,7 @@ public class AsyncWaitOperator<IN, OUT>
this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
- this.timeout = -1L;
+ this.timeout = timeout;
}
@Override
@@ -200,7 +201,7 @@ public class AsyncWaitOperator<IN, OUT>
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
- long timeoutTimestamp = timeout + System.currentTimeMillis();
+ long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(
timeoutTimestamp,
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index 4b22aaa..c122d23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
@@ -37,6 +38,7 @@ import java.util.Collection;
*
* @param <OUT> Type of the output elements
*/
+@Internal
public class Emitter<OUT> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
index 5a2e43c..916b412 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
@@ -18,11 +18,13 @@
package org.apache.flink.streaming.api.operators.async;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.StreamOperator;
/**
* Interface for {@link StreamOperator} actions.
*/
+@Internal
public interface OperatorActions {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
index 8088bf0..6226ae6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
+
import java.util.Collection;
/**
@@ -25,6 +27,7 @@ import java.util.Collection;
*
* @param <T> Type of the collection elements.
*/
+@Internal
public interface AsyncCollectionResult<T> extends AsyncResult {
boolean hasTimestamp();
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
index 1a99928..751de76 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -26,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
* either be a {@link Watermark} or a collection of new output elements produced by the
* {@link AsyncFunction}.
*/
+@Internal
public interface AsyncResult {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
index c19b520..68bde3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
@@ -18,11 +18,13 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
*/
+@Internal
public interface AsyncWatermarkResult extends AsyncResult {
/**
* Get the resulting watermark.
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index 2bbcb6c..414b3c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
@@ -37,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock;
* to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
* follows the insertion order (element cannot overtake each other).
*/
+@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {
private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
index 1a2c4a8..e02b8b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import java.util.Collection;
@@ -25,6 +26,7 @@ import java.util.Collection;
/**
* Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
*/
+@Internal
public interface StreamElementQueue {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 06ebf3c..7db9d4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -32,6 +33,7 @@ import java.util.concurrent.Executor;
*
* @param <T> Type of the result
*/
+@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {
/** Stream element */
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index f0e707e..c654702 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -34,6 +35,7 @@ import java.util.Collection;
*
* @param <OUT> Type of the asynchronous collection result
*/
+@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index 603d8cc..f244008 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.util.Preconditions;
@@ -41,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
* and no watermark can overtake a stream record. However, stream records falling in the same
* segment between two watermarks can overtake each other (their emission order is not guaranteed).
*/
+@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {
private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
index 6fe4f44..1f48303 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -25,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
/**
* {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
*/
+@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
private final Future<Watermark> future;
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 10ee14f..d9e885e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators.async;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -27,11 +28,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +56,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.util.ArrayDeque;
import java.util.Collections;
@@ -59,11 +65,16 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link AsyncWaitOperator}. These test that:
@@ -77,10 +88,12 @@ import static org.junit.Assert.assertEquals;
*/
public class AsyncWaitOperatorTest extends TestLogger {
+ private static final long TIMEOUT = 1000L;
+
private static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
private static final long serialVersionUID = 8522411971886428444L;
- private static final long TIMEOUT = 5000L;
+ private static final long TERMINATION_TIMEOUT = 5000L;
private static final int THREAD_POOL_SIZE = 10;
static ExecutorService executorService;
@@ -114,7 +127,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
executorService.shutdown();
try {
- if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
+ if (!executorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException interrupted) {
@@ -219,7 +232,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
- final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode);
+ final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+ new MyAsyncFunction(),
+ TIMEOUT,
+ 2,
+ mode);
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
@@ -280,7 +297,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
- final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode);
+ final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+ new MyAsyncFunction(), TIMEOUT, 6, mode);
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
@@ -380,10 +398,20 @@ public class AsyncWaitOperatorTest extends TestLogger {
DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);
if (withLazyFunction) {
- input = AsyncDataStream.orderedWait(input, new LazyAsyncFunction(), 6);
+ input = AsyncDataStream.orderedWait(
+ input,
+ new LazyAsyncFunction(),
+ TIMEOUT,
+ TimeUnit.MILLISECONDS,
+ 6);
}
else {
- input = AsyncDataStream.orderedWait(input, new MyAsyncFunction(), 6);
+ input = AsyncDataStream.orderedWait(
+ input,
+ new MyAsyncFunction(),
+ TIMEOUT,
+ TimeUnit.MILLISECONDS,
+ 6);
}
// the map function is designed to chain after async function. we place an Integer object in it and
@@ -407,7 +435,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
});
- input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3);
+ input = AsyncDataStream.unorderedWait(
+ input,
+ new MyAsyncFunction(),
+ TIMEOUT,
+ TimeUnit.MILLISECONDS,
+ 3);
input.map(new MapFunction<Integer, Integer>() {
private static final long serialVersionUID = 5162085254238405527L;
@@ -432,8 +465,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- AsyncWaitOperator<Integer, Integer> operator =
- new AsyncWaitOperator<>(new LazyAsyncFunction(), 3, AsyncDataStream.OutputMode.ORDERED);
+ AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+ new LazyAsyncFunction(),
+ TIMEOUT,
+ 3,
+ AsyncDataStream.OutputMode.ORDERED);
final StreamConfig streamConfig = testHarness.getStreamConfig();
streamConfig.setStreamOperator(operator);
@@ -481,8 +517,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness =
new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- AsyncWaitOperator<Integer, Integer> restoredOperator =
- new AsyncWaitOperator<>(new MyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+ AsyncWaitOperator<Integer, Integer> restoredOperator = new AsyncWaitOperator<>(
+ new MyAsyncFunction(),
+ TIMEOUT,
+ 6,
+ AsyncDataStream.OutputMode.ORDERED);
restoredTaskHarness.getStreamConfig().setStreamOperator(restoredOperator);
@@ -561,4 +600,74 @@ public class AsyncWaitOperatorTest extends TestLogger {
return checkpointStateHandles;
}
}
+
+ @Test
+ public void testAsyncTimeout() throws Exception {
+ final long timeout = 10L;
+
+ final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+ new LazyAsyncFunction(),
+ timeout,
+ 2,
+ AsyncDataStream.OutputMode.ORDERED);
+
+ final Environment mockEnvironment = mock(Environment.class);
+
+ final Configuration taskConfiguration = new Configuration();
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+ final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+ final Configuration taskManagerConfiguration = new Configuration();
+ final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TaskManagerRuntimeInfo("localhost", taskManagerConfiguration, "/tmp");
+ final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
+
+ when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
+ when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
+ when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
+ when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
+ when(mockEnvironment.getUserClassLoader()).thenReturn(AsyncWaitOperatorTest.class.getClassLoader());
+
+ final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment);
+
+ final long initialTime = 0L;
+ final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ testHarness.open();
+
+ testHarness.setProcessingTime(initialTime);
+
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.processElement(new StreamRecord<>(1, initialTime));
+ testHarness.setProcessingTime(initialTime + 5L);
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 5L));
+ }
+
+ // trigger the timeout of the first stream record
+ testHarness.setProcessingTime(initialTime + timeout + 1L);
+
+ // allow the second async stream record to be processed
+ LazyAsyncFunction.countDown();
+
+ // wait until all async collectors in the buffer have been emitted out.
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.close();
+ }
+
+ expectedOutput.add(new StreamRecord<>(2, initialTime + 5L));
+
+ TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
+
+ ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ verify(mockEnvironment).failExternally(argumentCaptor.capture());
+
+ Throwable failureCause = argumentCaptor.getValue();
+
+ Assert.assertNotNull(failureCause.getCause());
+ Assert.assertTrue(failureCause.getCause() instanceof ExecutionException);
+
+ Assert.assertNotNull(failureCause.getCause().getCause());
+ Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 830cd6f..b623fa1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -54,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.Preconditions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -107,29 +108,44 @@ public class AbstractStreamOperatorTestHarness<OUT> {
private volatile boolean wasFailedExternally = false;
public AbstractStreamOperatorTestHarness(
+ StreamOperator<OUT> operator,
+ int maxParallelism,
+ int numSubtasks,
+ int subtaskIndex) throws Exception {
+
+ this(
+ operator,
+ maxParallelism,
+ numSubtasks,
+ subtaskIndex,
+ new MockEnvironment(
+ "MockTask",
+ 3 * 1024 * 1024,
+ new MockInputSplitProvider(),
+ 1024,
+ new Configuration(),
+ new ExecutionConfig(),
+ maxParallelism,
+ numSubtasks,
+ subtaskIndex));
+ }
+
+ public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
int maxParallelism,
int numSubtasks,
- int subtaskIndex) throws Exception {
+ int subtaskIndex,
+ final Environment environment) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
- Configuration underlyingConfig = new Configuration();
+ Configuration underlyingConfig = environment.getTaskConfiguration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
- this.executionConfig = new ExecutionConfig();
+ this.executionConfig = environment.getExecutionConfig();
this.closableRegistry = new CloseableRegistry();
this.checkpointLock = new Object();
- environment = new MockEnvironment(
- "MockTask",
- 3 * 1024 * 1024,
- new MockInputSplitProvider(),
- 1024,
- underlyingConfig,
- executionConfig,
- maxParallelism,
- numSubtasks,
- subtaskIndex);
+ this.environment = Preconditions.checkNotNull(environment);
mockTask = mock(StreamTask.class);
processingTimeService = new TestProcessingTimeService();
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 86fbaa0..9b02378 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -45,6 +46,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
}
+ public OneInputStreamOperatorTestHarness(
+ OneInputStreamOperator<IN, OUT> operator,
+ TypeSerializer<IN> typeSerializerIn,
+ Environment environment) throws Exception {
+ this(operator, 1, 1, 0, environment);
+
+ config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+ }
+
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
this(operator, 1, 1, 0);
}
@@ -59,6 +69,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
this.oneInputOperator = operator;
}
+ public OneInputStreamOperatorTestHarness(
+ OneInputStreamOperator<IN, OUT> operator,
+ int maxParallelism,
+ int numTubtasks,
+ int subtaskIndex,
+ Environment environment) throws Exception {
+ super(operator, maxParallelism, numTubtasks, subtaskIndex, environment);
+
+ this.oneInputOperator = operator;
+ }
+
public void processElement(IN value, long timestamp) throws Exception {
processElement(new StreamRecord<>(value, timestamp));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 3631965..8ea1bd8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -41,6 +41,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
@@ -206,6 +207,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
@Test
public void testAsyncWaitOperator() throws Exception {
final int numElements = 5;
+ final long timeout = 1000L;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -240,7 +242,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
}
};
- DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+ DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(
+ input,
+ function,
+ timeout,
+ TimeUnit.MILLISECONDS,
+ 2).setParallelism(1);
// save result from ordered process
final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
@@ -249,8 +256,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
orderedResult.addSink(sinkFunction1).setParallelism(1);
-
- DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(input, function, 2);
+ DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(
+ input,
+ function,
+ timeout,
+ TimeUnit.MILLISECONDS,
+ 2);
// save result from unordered process
final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);