You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:42 UTC

[6/7] flink git commit: [FLINK-4391] Add timeout parameter for asynchronous I/O

[FLINK-4391] Add timeout parameter for asynchronous I/O

The timeout defines how long an asynchronous I/O operation can take. If the operation
takes longer than the timeout, then it is failed with an TimeoutException.

Annotate classes with internal Annotation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c5a8711
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c5a8711
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c5a8711

Branch: refs/heads/master
Commit: 6c5a8711d80dfcea20967aea009bac51122d5094
Parents: ad603d5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Dec 20 03:42:25 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:04:51 2016 +0100

----------------------------------------------------------------------
 .../examples/async/AsyncIOExample.java          |  19 ++-
 .../api/datastream/AsyncDataStream.java         |  67 +++++++---
 .../api/operators/async/AsyncWaitOperator.java  |   5 +-
 .../streaming/api/operators/async/Emitter.java  |   2 +
 .../api/operators/async/OperatorActions.java    |   2 +
 .../async/queue/AsyncCollectionResult.java      |   3 +
 .../api/operators/async/queue/AsyncResult.java  |   2 +
 .../async/queue/AsyncWatermarkResult.java       |   2 +
 .../async/queue/OrderedStreamElementQueue.java  |   2 +
 .../async/queue/StreamElementQueue.java         |   2 +
 .../async/queue/StreamElementQueueEntry.java    |   2 +
 .../async/queue/StreamRecordQueueEntry.java     |   2 +
 .../queue/UnorderedStreamElementQueue.java      |   2 +
 .../async/queue/WatermarkQueueEntry.java        |   2 +
 .../operators/async/AsyncWaitOperatorTest.java  | 131 +++++++++++++++++--
 .../util/AbstractStreamOperatorTestHarness.java |  42 ++++--
 .../util/OneInputStreamOperatorTestHarness.java |  21 +++
 .../streaming/api/StreamingOperatorsITCase.java |  17 ++-
 18 files changed, 277 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 6dde537..2b05983 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -206,7 +206,8 @@ public class AsyncIOExample {
 				"[--maxCount <max number of input from source, -1 for infinite input>] " +
 				"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
 				"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
-				"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]");
+				"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" +
+				"[--timeout <Timeout for the asynchronous operations>]");
 	}
 
 	public static void main(String[] args) throws Exception {
@@ -226,6 +227,7 @@ public class AsyncIOExample {
 		final int taskNum;
 		final String timeType;
 		final long shutdownWaitTS;
+		final long timeout;
 
 		try {
 			// check the configuration for the job
@@ -238,6 +240,7 @@ public class AsyncIOExample {
 			taskNum = params.getInt("waitOperatorParallelism", 1);
 			timeType = params.get("eventType", "EventTime");
 			shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
+			timeout = params.getLong("timeout", 10000L);
 		} catch (Exception e) {
 			printUsage();
 
@@ -292,10 +295,20 @@ public class AsyncIOExample {
 		// add async operator to streaming job
 		DataStream<String> result;
 		if (ORDERED.equals(mode)) {
-			result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
+			result = AsyncDataStream.orderedWait(
+				inputStream,
+				function,
+				timeout,
+				TimeUnit.MILLISECONDS,
+				20).setParallelism(taskNum);
 		}
 		else {
-			result = AsyncDataStream.unorderedWait(inputStream, function, 20).setParallelism(taskNum);
+			result = AsyncDataStream.unorderedWait(
+				inputStream,
+				function,
+				timeout,
+				TimeUnit.MILLISECONDS,
+				20).setParallelism(taskNum);
 		}
 
 		// add a reduce to get the sum of each keys.

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index 4fefde0..8132d28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * A helper class to apply {@link AsyncFunction} to a data stream.
  * <p>
@@ -31,7 +33,7 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
  * DataStream<String> input = ...
  * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
  *
- * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);
  * }
  * </pre>
  */
@@ -40,13 +42,14 @@ import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
 public class AsyncDataStream {
 	public enum OutputMode { ORDERED, UNORDERED }
 
-	private static final int DEFAULT_BUFFER_SIZE = 100;
+	private static final int DEFAULT_QUEUE_CAPACITY = 100;
 
 	/**
 	 * Add an AsyncWaitOperator.
 	 *
 	 * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
 	 * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
+	 * @param timeout for the asynchronous operation to complete
 	 * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
 	 * @param mode Processing mode for {@link AsyncWaitOperator}.
 	 * @param <IN> Input type.
@@ -56,6 +59,7 @@ public class AsyncDataStream {
 	private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
 			DataStream<IN> in,
 			AsyncFunction<IN, OUT> func,
+			long timeout,
 			int bufSize,
 			OutputMode mode) {
 
@@ -64,8 +68,11 @@ public class AsyncDataStream {
 				true, in.getType(), Utils.getCallLocationName(), true);
 
 		// create transform
-		AsyncWaitOperator<IN, OUT> operator =
-				new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func), bufSize, mode);
+		AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
+			in.getExecutionEnvironment().clean(func),
+			timeout,
+			bufSize,
+			mode);
 
 		return in.transform("async wait operator", outTypeInfo, operator);
 	}
@@ -75,7 +82,9 @@ public class AsyncDataStream {
 	 *
 	 * @param in Input {@link DataStream}
 	 * @param func {@link AsyncFunction}
-	 * @param bufSize The max number of async i/o operation that can be triggered
+	 * @param timeout for the asynchronous operation to complete
+	 * @param timeUnit of the given timeout
+	 * @param capacity The max number of async i/o operation that can be triggered
 	 * @param <IN> Type of input record
 	 * @param <OUT> Type of output record
 	 * @return A new {@link SingleOutputStreamOperator}.
@@ -83,30 +92,44 @@ public class AsyncDataStream {
 	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
 			DataStream<IN> in,
 			AsyncFunction<IN, OUT> func,
-			int bufSize) {
-		return addOperator(in, func, bufSize, OutputMode.UNORDERED);
+			long timeout,
+			TimeUnit timeUnit,
+			int capacity) {
+		return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
 	}
 
 	/**
 	 * Add an AsyncWaitOperator. The order of output stream records may be reordered.
 	 * @param in Input {@link DataStream}
 	 * @param func {@link AsyncFunction}
+	 * @param timeout for the asynchronous operation to complete
+	 * @param timeUnit of the given timeout
 	 * @param <IN> Type of input record
 	 * @param <OUT> Type of output record
 	 * @return A new {@link SingleOutputStreamOperator}.
 	 */
 	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
 			DataStream<IN> in,
-			AsyncFunction<IN, OUT> func) {
-		return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.UNORDERED);
+			AsyncFunction<IN, OUT> func,
+			long timeout,
+			TimeUnit timeUnit) {
+		return addOperator(
+			in,
+			func,
+			timeUnit.toMillis(timeout),
+			DEFAULT_QUEUE_CAPACITY,
+			OutputMode.UNORDERED);
 	}
 
 	/**
-	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
+	 * input ones.
 	 *
 	 * @param in Input {@link DataStream}
 	 * @param func {@link AsyncFunction}
-	 * @param bufSize The max number of async i/o operation that can be triggered
+	 * @param timeout for the asynchronous operation to complete
+	 * @param timeUnit of the given timeout
+	 * @param capacity The max number of async i/o operation that can be triggered
 	 * @param <IN> Type of input record
 	 * @param <OUT> Type of output record
 	 * @return A new {@link SingleOutputStreamOperator}.
@@ -114,22 +137,34 @@ public class AsyncDataStream {
 	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
 			DataStream<IN> in,
 			AsyncFunction<IN, OUT> func,
-			int bufSize) {
-		return addOperator(in, func, bufSize, OutputMode.ORDERED);
+			long timeout,
+			TimeUnit timeUnit,
+			int capacity) {
+		return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
 	}
 
 	/**
-	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
+	 * input ones.
 	 *
 	 * @param in Input {@link DataStream}
 	 * @param func {@link AsyncFunction}
+	 * @param timeout for the asynchronous operation to complete
+	 * @param timeUnit of the given timeout
 	 * @param <IN> Type of input record
 	 * @param <OUT> Type of output record
 	 * @return A new {@link SingleOutputStreamOperator}.
 	 */
 	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
 			DataStream<IN> in,
-			AsyncFunction<IN, OUT> func) {
-		return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.ORDERED);
+			AsyncFunction<IN, OUT> func,
+			long timeout,
+			TimeUnit timeUnit) {
+		return addOperator(
+			in,
+			func,
+			timeUnit.toMillis(timeout),
+			DEFAULT_QUEUE_CAPACITY,
+			OutputMode.ORDERED);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 88fc833..754b754 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -114,6 +114,7 @@ public class AsyncWaitOperator<IN, OUT>
 
 	public AsyncWaitOperator(
 			AsyncFunction<IN, OUT> asyncFunction,
+			long timeout,
 			int capacity,
 			AsyncDataStream.OutputMode outputMode) {
 		super(asyncFunction);
@@ -124,7 +125,7 @@ public class AsyncWaitOperator<IN, OUT>
 
 		this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
 
-		this.timeout = -1L;
+		this.timeout = timeout;
 	}
 
 	@Override
@@ -200,7 +201,7 @@ public class AsyncWaitOperator<IN, OUT>
 
 		if (timeout > 0L) {
 			// register a timeout for this AsyncStreamRecordBufferEntry
-			long timeoutTimestamp = timeout + System.currentTimeMillis();
+			long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
 
 			getProcessingTimeService().registerTimer(
 				timeoutTimestamp,

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
index 4b22aaa..c122d23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
@@ -37,6 +38,7 @@ import java.util.Collection;
  *
  * @param <OUT> Type of the output elements
  */
+@Internal
 public class Emitter<OUT> implements Runnable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
index 5a2e43c..916b412 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.streaming.api.operators.async;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
  * Interface for {@link StreamOperator} actions.
  */
+@Internal
 public interface OperatorActions {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
index 8088bf0..6226ae6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
+
 import java.util.Collection;
 
 /**
@@ -25,6 +27,7 @@ import java.util.Collection;
  *
  * @param <T> Type of the collection elements.
  */
+@Internal
 public interface AsyncCollectionResult<T> extends AsyncResult {
 
 	boolean hasTimestamp();

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
index 1a99928..751de76 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
@@ -26,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
  * either be a {@link Watermark} or a collection of new output elements produced by the
  * {@link AsyncFunction}.
  */
+@Internal
 public interface AsyncResult {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
index c19b520..68bde3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
 /**
  * {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
  */
+@Internal
 public interface AsyncWatermarkResult extends AsyncResult {
 	/**
 	 * Get the resulting watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index 2bbcb6c..414b3c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
@@ -37,6 +38,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
  * follows the insertion order (element cannot overtake each other).
  */
+@Internal
 public class OrderedStreamElementQueue implements StreamElementQueue {
 
 	private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
index 1a2c4a8..e02b8b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
 
 import java.util.Collection;
@@ -25,6 +26,7 @@ import java.util.Collection;
 /**
  * Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
  */
+@Internal
 public interface StreamElementQueue {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
index 06ebf3c..7db9d4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -32,6 +33,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> Type of the result
  */
+@Internal
 public abstract class StreamElementQueueEntry<T> implements AsyncResult {
 
 	/** Stream element */

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
index f0e707e..c654702 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
@@ -34,6 +35,7 @@ import java.util.Collection;
  *
  * @param <OUT> Type of the asynchronous collection result
  */
+@Internal
 public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
 	implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index 603d8cc..f244008 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
@@ -41,6 +42,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * and no watermark can overtake a stream record. However, stream records falling in the same
  * segment between two watermarks can overtake each other (their emission order is not guaranteed).
  */
+@Internal
 public class UnorderedStreamElementQueue implements StreamElementQueue {
 
 	private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
index 6fe4f44..1f48303 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -25,6 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 /**
  * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
  */
+@Internal
 public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
 
 	private final Future<Watermark> future;

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 10ee14f..d9e885e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.async;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -27,11 +28,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -51,6 +56,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.util.ArrayDeque;
 import java.util.Collections;
@@ -59,11 +65,16 @@ import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link AsyncWaitOperator}. These test that:
@@ -77,10 +88,12 @@ import static org.junit.Assert.assertEquals;
  */
 public class AsyncWaitOperatorTest extends TestLogger {
 
+	private static final long TIMEOUT = 1000L;
+
 	private static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
 		private static final long serialVersionUID = 8522411971886428444L;
 
-		private static final long TIMEOUT = 5000L;
+		private static final long TERMINATION_TIMEOUT = 5000L;
 		private static final int THREAD_POOL_SIZE = 10;
 
 		static ExecutorService executorService;
@@ -114,7 +127,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 					executorService.shutdown();
 
 					try {
-						if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
+						if (!executorService.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.MILLISECONDS)) {
 							executorService.shutdownNow();
 						}
 					} catch (InterruptedException interrupted) {
@@ -219,7 +232,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
-		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode);
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new MyAsyncFunction(),
+			TIMEOUT,
+			2,
+			mode);
 
 		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
@@ -280,7 +297,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
-		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode);
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new MyAsyncFunction(), TIMEOUT, 6, mode);
 
 		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
 
@@ -380,10 +398,20 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		DataStream<Integer> input = chainEnv.fromElements(1, 2, 3);
 
 		if (withLazyFunction) {
-			input = AsyncDataStream.orderedWait(input, new LazyAsyncFunction(), 6);
+			input = AsyncDataStream.orderedWait(
+				input,
+				new LazyAsyncFunction(),
+				TIMEOUT,
+				TimeUnit.MILLISECONDS,
+				6);
 		}
 		else {
-			input = AsyncDataStream.orderedWait(input, new MyAsyncFunction(), 6);
+			input = AsyncDataStream.orderedWait(
+				input,
+				new MyAsyncFunction(),
+				TIMEOUT,
+				TimeUnit.MILLISECONDS,
+				6);
 		}
 
 		// the map function is designed to chain after async function. we place an Integer object in it and
@@ -407,7 +435,12 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			}
 		});
 
-		input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3);
+		input = AsyncDataStream.unorderedWait(
+			input,
+			new MyAsyncFunction(),
+			TIMEOUT,
+			TimeUnit.MILLISECONDS,
+			3);
 
 		input.map(new MapFunction<Integer, Integer>() {
 			private static final long serialVersionUID = 5162085254238405527L;
@@ -432,8 +465,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
 
-		AsyncWaitOperator<Integer, Integer> operator =
-				new AsyncWaitOperator<>(new LazyAsyncFunction(), 3, AsyncDataStream.OutputMode.ORDERED);
+		AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new LazyAsyncFunction(),
+			TIMEOUT,
+			3,
+			AsyncDataStream.OutputMode.ORDERED);
 
 		final StreamConfig streamConfig = testHarness.getStreamConfig();
 		streamConfig.setStreamOperator(operator);
@@ -481,8 +517,11 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		final OneInputStreamTaskTestHarness<Integer, Integer> restoredTaskHarness =
 				new OneInputStreamTaskTestHarness<>(restoredTask, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
 
-		AsyncWaitOperator<Integer, Integer> restoredOperator =
-				new AsyncWaitOperator<>(new MyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+		AsyncWaitOperator<Integer, Integer> restoredOperator = new AsyncWaitOperator<>(
+			new MyAsyncFunction(),
+			TIMEOUT,
+			6,
+			AsyncDataStream.OutputMode.ORDERED);
 
 		restoredTaskHarness.getStreamConfig().setStreamOperator(restoredOperator);
 
@@ -561,4 +600,74 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			return checkpointStateHandles;
 		}
 	}
+
+	@Test
+	public void testAsyncTimeout() throws Exception {
+		final long timeout = 10L;
+
+		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
+			new LazyAsyncFunction(),
+			timeout,
+			2,
+			AsyncDataStream.OutputMode.ORDERED);
+
+		final Environment mockEnvironment = mock(Environment.class);
+
+		final Configuration taskConfiguration = new Configuration();
+		final ExecutionConfig executionConfig = new ExecutionConfig();
+		final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+		final Configuration taskManagerConfiguration = new Configuration();
+		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TaskManagerRuntimeInfo("localhost", taskManagerConfiguration, "/tmp");
+		final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
+
+		when(mockEnvironment.getTaskConfiguration()).thenReturn(taskConfiguration);
+		when(mockEnvironment.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockEnvironment.getMetricGroup()).thenReturn(metricGroup);
+		when(mockEnvironment.getTaskManagerInfo()).thenReturn(taskManagerRuntimeInfo);
+		when(mockEnvironment.getTaskInfo()).thenReturn(taskInfo);
+		when(mockEnvironment.getUserClassLoader()).thenReturn(AsyncWaitOperatorTest.class.getClassLoader());
+
+		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+			new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE, mockEnvironment);
+
+		final long initialTime = 0L;
+		final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		testHarness.setProcessingTime(initialTime);
+
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.processElement(new StreamRecord<>(1, initialTime));
+			testHarness.setProcessingTime(initialTime + 5L);
+			testHarness.processElement(new StreamRecord<>(2, initialTime + 5L));
+		}
+
+		// trigger the timeout of the first stream record
+		testHarness.setProcessingTime(initialTime + timeout + 1L);
+
+		// allow the second async stream record to be processed
+		LazyAsyncFunction.countDown();
+
+		// wait until all async collectors in the buffer have been emitted out.
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.close();
+		}
+
+		expectedOutput.add(new StreamRecord<>(2, initialTime + 5L));
+
+		TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
+
+		ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+		verify(mockEnvironment).failExternally(argumentCaptor.capture());
+
+		Throwable failureCause = argumentCaptor.getValue();
+
+		Assert.assertNotNull(failureCause.getCause());
+		Assert.assertTrue(failureCause.getCause() instanceof ExecutionException);
+
+		Assert.assertNotNull(failureCause.getCause().getCause());
+		Assert.assertTrue(failureCause.getCause().getCause() instanceof TimeoutException);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 830cd6f..b623fa1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -54,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -107,29 +108,44 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	private volatile boolean wasFailedExternally = false;
 
 	public AbstractStreamOperatorTestHarness(
+		StreamOperator<OUT> operator,
+		int maxParallelism,
+		int numSubtasks,
+		int subtaskIndex) throws Exception {
+
+		this(
+			operator,
+			maxParallelism,
+			numSubtasks,
+			subtaskIndex,
+			new MockEnvironment(
+				"MockTask",
+				3 * 1024 * 1024,
+				new MockInputSplitProvider(),
+				1024,
+				new Configuration(),
+				new ExecutionConfig(),
+				maxParallelism,
+				numSubtasks,
+				subtaskIndex));
+	}
+
+	public AbstractStreamOperatorTestHarness(
 			StreamOperator<OUT> operator,
 			int maxParallelism,
 			int numSubtasks,
-			int subtaskIndex) throws Exception {
+			int subtaskIndex,
+			final Environment environment) throws Exception {
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<>();
-		Configuration underlyingConfig = new Configuration();
+		Configuration underlyingConfig = environment.getTaskConfiguration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
-		this.executionConfig = new ExecutionConfig();
+		this.executionConfig = environment.getExecutionConfig();
 		this.closableRegistry = new CloseableRegistry();
 		this.checkpointLock = new Object();
 
-		environment = new MockEnvironment(
-				"MockTask",
-				3 * 1024 * 1024,
-				new MockInputSplitProvider(),
-				1024,
-				underlyingConfig,
-				executionConfig,
-				maxParallelism,
-				numSubtasks,
-				subtaskIndex);
+		this.environment = Preconditions.checkNotNull(environment);
 
 		mockTask = mock(StreamTask.class);
 		processingTimeService = new TestProcessingTimeService();

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 86fbaa0..9b02378 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -45,6 +46,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
 	}
 
+	public OneInputStreamOperatorTestHarness(
+		OneInputStreamOperator<IN, OUT> operator,
+		TypeSerializer<IN> typeSerializerIn,
+		Environment environment) throws Exception {
+		this(operator, 1, 1, 0, environment);
+
+		config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+	}
+
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception {
 		this(operator, 1, 1, 0);
 	}
@@ -59,6 +69,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		this.oneInputOperator = operator;
 	}
 
+	public OneInputStreamOperatorTestHarness(
+		OneInputStreamOperator<IN, OUT> operator,
+		int maxParallelism,
+		int numTubtasks,
+		int subtaskIndex,
+		Environment environment) throws Exception {
+		super(operator, maxParallelism, numTubtasks, subtaskIndex, environment);
+
+		this.oneInputOperator = operator;
+	}
+
 	public void processElement(IN value, long timestamp) throws Exception {
 		processElement(new StreamRecord<>(value, timestamp));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c5a8711/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 3631965..8ea1bd8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -41,6 +41,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
 
@@ -206,6 +207,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 	@Test
 	public void testAsyncWaitOperator() throws Exception {
 		final int numElements = 5;
+		final long timeout = 1000L;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -240,7 +242,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			}
 		};
 
-		DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(input, function, 2).setParallelism(1);
+		DataStream<Integer> orderedResult = AsyncDataStream.orderedWait(
+			input,
+			function,
+			timeout,
+			TimeUnit.MILLISECONDS,
+			2).setParallelism(1);
 
 		// save result from ordered process
 		final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
@@ -249,8 +256,12 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		orderedResult.addSink(sinkFunction1).setParallelism(1);
 
-
-		DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(input, function, 2);
+		DataStream<Integer> unorderedResult = AsyncDataStream.unorderedWait(
+			input,
+			function,
+			timeout,
+			TimeUnit.MILLISECONDS,
+			2);
 
 		// save result from unordered process
 		final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);