You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:41 UTC

[5/7] flink git commit: [FLINK-4391] Polish asynchronous I/O operations

[FLINK-4391] Polish asynchronous I/O operations

Polish AsyncFunction

Move AsyncCollectorBuffer to operators package

Rework AsyncWaitOperator and AsyncStreamElementQueue implementation

Rename AsyncCollectorQueue into StreamElementQueue

Reworked StreamingOperatorsITCase and RichAsyncFunctionTest

Refactor AsyncWaitOperatorTest

Add StreamElementQueueTests

Add EmitterTest case

Add comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad603d59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad603d59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad603d59

Branch: refs/heads/master
Commit: ad603d59ec17a07adef995c2f1fd58fb8571a3d8
Parents: f528307
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 14 17:37:39 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:04:51 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  54 +-
 .../examples/async/AsyncIOExample.java          | 128 ++--
 .../api/functions/async/AsyncFunction.java      |  22 +-
 .../api/functions/async/RichAsyncFunction.java  | 156 ++---
 .../async/buffer/AbstractBufferEntry.java       |  78 ---
 .../async/buffer/AsyncCollectorBuffer.java      | 633 ------------------
 .../async/buffer/LatencyMarkerEntry.java        |  36 -
 .../async/buffer/StreamElementEntry.java        |  82 ---
 .../async/buffer/StreamRecordEntry.java         |  75 ---
 .../functions/async/buffer/WatermarkEntry.java  |  36 -
 .../async/collector/AsyncCollector.java         |  11 +-
 .../api/operators/TimestampedCollector.java     |   4 +
 .../api/operators/async/AsyncWaitOperator.java  | 324 ++++++---
 .../streaming/api/operators/async/Emitter.java  | 152 +++++
 .../api/operators/async/OperatorActions.java    |  34 +
 .../async/queue/AsyncCollectionResult.java      |  41 ++
 .../api/operators/async/queue/AsyncResult.java  |  59 ++
 .../async/queue/AsyncWatermarkResult.java       |  33 +
 .../async/queue/OrderedStreamElementQueue.java  | 229 +++++++
 .../async/queue/StreamElementQueue.java         |  96 +++
 .../async/queue/StreamElementQueueEntry.java    |  97 +++
 .../async/queue/StreamRecordQueueEntry.java     |  85 +++
 .../queue/UnorderedStreamElementQueue.java      | 304 +++++++++
 .../async/queue/WatermarkQueueEntry.java        |  47 ++
 .../streaming/runtime/tasks/OperatorChain.java  |   1 -
 .../streaming/runtime/tasks/StreamTask.java     |  10 +-
 .../functions/async/RichAsyncFunctionTest.java  | 283 +++++---
 .../async/AsyncCollectorBufferTest.java         | 656 -------------------
 .../operators/async/AsyncWaitOperatorTest.java  | 315 ++++-----
 .../api/operators/async/EmitterTest.java        | 193 ++++++
 .../queue/OrderedStreamElementQueueTest.java    | 124 ++++
 .../async/queue/StreamElementQueueTest.java     | 263 ++++++++
 .../queue/UnorderedStreamElementQueueTest.java  | 182 +++++
 .../operators/StreamSourceOperatorTest.java     |  30 +-
 .../flink/streaming/util/CollectorOutput.java   |  57 ++
 .../streaming/api/StreamingOperatorsITCase.java |  96 ++-
 36 files changed, 2852 insertions(+), 2174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 32bc1d2..d1357a8 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -26,10 +26,13 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 @Internal
 public final class ExceptionUtils {
 	public static final String STRINGIFIED_NULL_EXCEPTION = "(null)";
@@ -59,7 +62,56 @@ public final class ExceptionUtils {
 			return e.getClass().getName() + " (error while printing stack trace)";
 		}
 	}
-	
+
+	/**
+	 * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
+	 * to a prior exception, or returns the new exception, if no prior exception exists.
+	 *
+	 * <pre>{@code
+	 *
+	 * public void closeAllThings() throws Exception {
+	 *     Exception ex = null;
+	 *     try {
+	 *         component.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         anotherComponent.stop();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *     try {
+	 *         lastComponent.shutdown();
+	 *     } catch (Exception e) {
+	 *         ex = firstOrSuppressed(e, ex);
+	 *     }
+	 *
+	 *     if (ex != null) {
+	 *         throw ex;
+	 *     }
+	 * }
+	 * }</pre>
+	 *
+	 * @param newException The newly occurred exception
+	 * @param previous     The previously occurred exception, possibly null.
+	 *
+	 * @return The new exception, if no previous exception exists, or the previous exception with the
+	 *         new exception in the list of suppressed exceptions.
+	 */
+	public static <T extends Throwable> T firstOrSuppressed(T newException, @Nullable T previous) {
+		checkNotNull(newException, "newException");
+
+		if (previous == null) {
+			return newException;
+		} else {
+			previous.addSuppressed(newException);
+			return previous;
+		}
+	}
+
+
+
 	/**
 	 * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
 	 * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 96c7658..6dde537 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -33,6 +33,8 @@ import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -43,10 +45,17 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ * Example to illustrates how to use {@link AsyncFunction}
  */
 public class AsyncIOExample {
 
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
+
+	private static final String EXACTLY_ONCE_MODE = "exactly_once";
+	private static final String EVENT_TIME = "EventTime";
+	private static final String INGESTION_TIME = "IngestionTime";
+	private static final String ORDERED = "ordered";
+
 	/**
 	 * A checkpointed source.
 	 */
@@ -103,8 +112,10 @@ public class AsyncIOExample {
 	 * async client.
 	 */
 	private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
-		transient static ExecutorService executorService;
-		transient static Random random;
+		private static final long serialVersionUID = 2098635244857937717L;
+
+		private static ExecutorService executorService;
+		private static Random random;
 
 		private int counter;
 
@@ -112,17 +123,17 @@ public class AsyncIOExample {
 		 * The result of multiplying sleepFactor with a random float is used to pause
 		 * the working thread in the thread pool, simulating a time consuming async operation.
 		 */
-		final long sleepFactor;
+		private final long sleepFactor;
 
 		/**
 		 * The ratio to generate an exception to simulate an async error. For example, the error
 		 * may be a TimeoutException while visiting HBase.
 		 */
-		final float failRatio;
+		private final float failRatio;
 
-		final long shutdownWaitTS;
+		private final long shutdownWaitTS;
 
-		public SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
+		SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
 			this.sleepFactor = sleepFactor;
 			this.failRatio = failRatio;
 			this.shutdownWaitTS = shutdownWaitTS;
@@ -155,7 +166,9 @@ public class AsyncIOExample {
 					executorService.shutdown();
 
 					try {
-						executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS);
+						if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
+							executorService.shutdownNow();
+						}
 					} catch (InterruptedException e) {
 						executorService.shutdownNow();
 					}
@@ -169,14 +182,15 @@ public class AsyncIOExample {
 				@Override
 				public void run() {
 					// wait for while to simulate async operation here
-					int sleep = (int) (random.nextFloat() * sleepFactor);
+					long sleep = (long) (random.nextFloat() * sleepFactor);
 					try {
 						Thread.sleep(sleep);
-						List<String> ret = Collections.singletonList("key-" + (input % 10));
+
 						if (random.nextFloat() < failRatio) {
 							collector.collect(new Exception("wahahahaha..."));
 						} else {
-							collector.collect(ret);
+							collector.collect(
+								Collections.singletonList("key-" + (input % 10)));
 						}
 					} catch (InterruptedException e) {
 						collector.collect(new ArrayList<String>(0));
@@ -200,47 +214,71 @@ public class AsyncIOExample {
 		// obtain execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		printUsage();
-
 		// parse parameters
 		final ParameterTool params = ParameterTool.fromArgs(args);
 
-		// check the configuration for the job
-		final String statePath = params.getRequired("fsStatePath");
-		final String cpMode = params.get("checkpointMode", "exactly_once");
-		final int maxCount = params.getInt("maxCount", 100000);
-		final int sleepFactor = params.getInt("sleepFactor", 100);
-		final float failRatio = params.getFloat("failRatio", 0.001f);
-		final String mode = params.get("waitMode", "ordered");
-		final int taskNum =  params.getInt("waitOperatorParallelism", 1);
-		final String timeType = params.get("eventType", "EventTime");
-		final int shutdownWaitTS = params.getInt("shutdownWaitTS", 20000);
-
-		System.out.println("Job configuration\n"
-			+"\tFS state path="+statePath+"\n"
-			+"\tCheckpoint mode="+cpMode+"\n"
-			+"\tMax count of input from source="+maxCount+"\n"
-			+"\tSleep factor="+sleepFactor+"\n"
-			+"\tFail ratio="+failRatio+"\n"
-			+"\tWaiting mode="+mode+"\n"
-			+"\tParallelism for async wait operator="+taskNum+"\n"
-			+"\tEvent type="+timeType+"\n"
-			+"\tShutdown wait timestamp="+shutdownWaitTS);
-
-		// setup state and checkpoint mode
-		env.setStateBackend(new FsStateBackend(statePath));
-		if (cpMode.equals("exactly_once")) {
-			env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
+		final String statePath;
+		final String cpMode;
+		final int maxCount;
+		final long sleepFactor;
+		final float failRatio;
+		final String mode;
+		final int taskNum;
+		final String timeType;
+		final long shutdownWaitTS;
+
+		try {
+			// check the configuration for the job
+			statePath = params.get("fsStatePath", null);
+			cpMode = params.get("checkpointMode", "exactly_once");
+			maxCount = params.getInt("maxCount", 100000);
+			sleepFactor = params.getLong("sleepFactor", 100);
+			failRatio = params.getFloat("failRatio", 0.001f);
+			mode = params.get("waitMode", "ordered");
+			taskNum = params.getInt("waitOperatorParallelism", 1);
+			timeType = params.get("eventType", "EventTime");
+			shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
+		} catch (Exception e) {
+			printUsage();
+
+			throw e;
+		}
+
+		StringBuilder configStringBuilder = new StringBuilder();
+
+		final String lineSeparator = System.getProperty("line.separator");
+
+		configStringBuilder
+			.append("Job configuration").append(lineSeparator)
+			.append("FS state path=").append(statePath).append(lineSeparator)
+			.append("Checkpoint mode=").append(cpMode).append(lineSeparator)
+			.append("Max count of input from source=").append(maxCount).append(lineSeparator)
+			.append("Sleep factor=").append(sleepFactor).append(lineSeparator)
+			.append("Fail ratio=").append(failRatio).append(lineSeparator)
+			.append("Waiting mode=").append(mode).append(lineSeparator)
+			.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
+			.append("Event type=").append(timeType).append(lineSeparator)
+			.append("Shutdown wait timestamp=").append(shutdownWaitTS);
+
+		LOG.info(configStringBuilder.toString());
+
+		if (statePath != null) {
+			// setup state and checkpoint mode
+			env.setStateBackend(new FsStateBackend(statePath));
+		}
+
+		if (EXACTLY_ONCE_MODE.equals(cpMode)) {
+			env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
 		}
 		else {
-			env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+			env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
 		}
 
 		// enable watermark or not
-		if (timeType.equals("EventTime")) {
+		if (EVENT_TIME.equals(timeType)) {
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		}
-		else if (timeType.equals("IngestionTime")) {
+		else if (INGESTION_TIME.equals(timeType)) {
 			env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 		}
 
@@ -253,7 +291,7 @@ public class AsyncIOExample {
 
 		// add async operator to streaming job
 		DataStream<String> result;
-		if (mode.equals("ordered")) {
+		if (ORDERED.equals(mode)) {
 			result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
 		}
 		else {
@@ -262,6 +300,8 @@ public class AsyncIOExample {
 
 		// add a reduce to get the sum of each keys.
 		result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = -938116068682344455L;
+
 			@Override
 			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
 				out.collect(new Tuple2<>(value, 1));
@@ -269,7 +309,7 @@ public class AsyncIOExample {
 		}).keyBy(0).sum(1).print();
 
 		// execute the program
-		env.execute("Async I/O Example");
+		env.execute("Async IO Example");
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index b5b7d6f..4de2db1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -29,14 +29,15 @@ import java.io.Serializable;
  * <p>
  * For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
  * the result can be collected by calling {@link AsyncCollector#collect}. For each async
- * operations, their contexts are buffered in the operator immediately after invoking
- * #asyncInvoke, leading to no blocking for each stream input as long as internal buffer is not full.
+ * operation, its context is stored in the operator immediately after invoking
+ * #asyncInvoke, avoiding blocking for each stream input as long as the internal buffer is not full.
  * <p>
- * {@link AsyncCollector} can be passed into callbacks or futures provided by async client to
- * fetch result data. Any error can also be propagate to the operator by {@link AsyncCollector#collect(Throwable)}.
+ * {@link AsyncCollector} can be passed into callbacks or futures to collect the result data.
+ * An error can also be propagate to the async IO operator by
+ * {@link AsyncCollector#collect(Throwable)}.
  *
  * <p>
- * Typical usage for callback:
+ * Callback example usage:
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  *   @Override
@@ -46,11 +47,10 @@ import java.io.Serializable;
  *     hbase.asyncGet(get, cb);
  *   }
  * }
- * }
  * </pre>
  *
  * <p>
- * Typical usage for {@link com.google.common.util.concurrent.ListenableFuture}
+ * Future example usage:
  * <pre>{@code
  * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
  *   @Override
@@ -68,7 +68,6 @@ import java.io.Serializable;
  *     });
  *   }
  * }
- * }
  * </pre>
  *
  * @param <IN> The type of the input elements.
@@ -80,9 +79,10 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
 	/**
 	 * Trigger async operation for each stream input.
 	 *
-	 * @param input Stream Input
-	 * @param collector AsyncCollector
-	 * @exception Exception will make task fail and trigger fail-over process.
+	 * @param input element coming from an upstream task
+	 * @param collector to collect the result data
+	 * @exception Exception in case of a user code error. An exception will make the task fail and
+	 * trigger fail-over process.
 	 */
 	void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index f6d3d31..232206c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -41,6 +41,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.types.Value;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.List;
@@ -48,88 +49,55 @@ import java.util.Map;
 
 /**
  * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RuntimeContext} and provides setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
  *
  * <p>
- * State related apis in {@link RuntimeContext} are not supported yet because the key may get changed
- * while accessing states in the working thread.
+ * State related apis in {@link RuntimeContext} are not supported yet because the key may get
+ * changed while accessing states in the working thread.
  * <p>
- * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the aggregator
- * may be modified by multiple threads.
+ * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the
+ * aggregator may be modified by multiple threads.
  *
  * @param <IN> The type of the input elements.
  * @param <OUT> The type of the returned elements.
  */
-
 @PublicEvolving
-public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
-	implements AsyncFunction<IN, OUT> {
+public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
 
-	private transient RuntimeContext runtimeContext;
+	private static final long serialVersionUID = 3858030061138121840L;
 
 	@Override
-	public void setRuntimeContext(RuntimeContext t) {
-		super.setRuntimeContext(t);
+	public void setRuntimeContext(RuntimeContext runtimeContext) {
+		Preconditions.checkNotNull(runtimeContext);
 
-		if (t != null) {
-			runtimeContext = new RichAsyncFunctionRuntimeContext(t);
+		if (runtimeContext instanceof IterationRuntimeContext) {
+			super.setRuntimeContext(
+				new RichAsyncFunctionIterationRuntimeContext(
+					(IterationRuntimeContext) runtimeContext));
+		} else {
+			super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
 		}
 	}
 
 	@Override
 	public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
 
-	@Override
-	public RuntimeContext getRuntimeContext() {
-		if (this.runtimeContext != null) {
-			return runtimeContext;
-		} else {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		}
-	}
-
-	@Override
-	public IterationRuntimeContext getIterationRuntimeContext() {
-		if (this.runtimeContext != null) {
-			return (IterationRuntimeContext) runtimeContext;
-		} else {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		}
-	}
+	// -----------------------------------------------------------------------------------------
+	// Wrapper classes
+	// -----------------------------------------------------------------------------------------
 
 	/**
-	 * A wrapper class to delegate {@link RuntimeContext}. State related apis are disabled.
+	 * A wrapper class for async function's {@link RuntimeContext}. The async function runtime
+	 * context only supports basic operations which are thread safe. Consequently, state access,
+	 * accumulators, broadcast variables and the distributed cache are disabled.
 	 */
-	private class RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
-		private RuntimeContext runtimeContext;
-
-		public RichAsyncFunctionRuntimeContext(RuntimeContext context) {
-			runtimeContext = context;
-		}
-
-		private IterationRuntimeContext getIterationRuntineContext() {
-			if (this.runtimeContext instanceof IterationRuntimeContext) {
-				return (IterationRuntimeContext) this.runtimeContext;
-			} else {
-				throw new IllegalStateException("This stub is not part of an iteration step function.");
-			}
-		}
-
-		@Override
-		public int getSuperstepNumber() {
-			return getIterationRuntineContext().getSuperstepNumber();
-		}
-
-		@Override
-		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-			throw new UnsupportedOperationException("Get iteration aggregator is not supported in rich async function");
-		}
+	private static class RichAsyncFunctionRuntimeContext implements RuntimeContext {
+		private final RuntimeContext runtimeContext;
 
-		@Override
-		public <T extends Value> T getPreviousIterationAggregate(String name) {
-			return getIterationRuntineContext().getPreviousIterationAggregate(name);
+		RichAsyncFunctionRuntimeContext(RuntimeContext context) {
+			runtimeContext = Preconditions.checkNotNull(context);
 		}
 
 		@Override
@@ -172,74 +140,108 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
 			return runtimeContext.getUserCodeClassLoader();
 		}
 
+		// -----------------------------------------------------------------------------------
+		// Unsupported operations
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public DistributedCache getDistributedCache() {
+			throw new UnsupportedOperationException("Distributed cache is not supported in rich async functions.");
+		}
+
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async functions.");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async functions.");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async functions.");
+		}
+
 		@Override
 		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
-			runtimeContext.addAccumulator(name, accumulator);
+			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
 		}
 
 		@Override
 		public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-			return runtimeContext.getAccumulator(name);
+			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
 		}
 
 		@Override
 		public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-			return runtimeContext.getAllAccumulators();
+			throw new UnsupportedOperationException("Accumulators are not supported in rich async functions.");
 		}
 
 		@Override
 		public IntCounter getIntCounter(String name) {
-			return runtimeContext.getIntCounter(name);
+			throw new UnsupportedOperationException("Int counters are not supported in rich async functions.");
 		}
 
 		@Override
 		public LongCounter getLongCounter(String name) {
-			return runtimeContext.getLongCounter(name);
+			throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
 		}
 
 		@Override
 		public DoubleCounter getDoubleCounter(String name) {
-			return runtimeContext.getDoubleCounter(name);
+			throw new UnsupportedOperationException("Long counters are not supported in rich async functions.");
 		}
 
 		@Override
 		public Histogram getHistogram(String name) {
-			return runtimeContext.getHistogram(name);
+			throw new UnsupportedOperationException("Histograms are not supported in rich async functions.");
 		}
 
 		@Override
 		public boolean hasBroadcastVariable(String name) {
-			return runtimeContext.hasBroadcastVariable(name);
+			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
 		}
 
 		@Override
 		public <RT> List<RT> getBroadcastVariable(String name) {
-			return runtimeContext.getBroadcastVariable(name);
+			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
 		}
 
 		@Override
 		public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-			return runtimeContext.getBroadcastVariableWithInitializer(name, initializer);
+			throw new UnsupportedOperationException("Broadcast variables are not supported in rich async functions.");
 		}
+	}
 
-		@Override
-		public DistributedCache getDistributedCache() {
-			return runtimeContext.getDistributedCache();
+	private static class RichAsyncFunctionIterationRuntimeContext extends RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
+
+		private final IterationRuntimeContext iterationRuntimeContext;
+
+		RichAsyncFunctionIterationRuntimeContext(IterationRuntimeContext iterationRuntimeContext) {
+			super(iterationRuntimeContext);
+
+			this.iterationRuntimeContext = Preconditions.checkNotNull(iterationRuntimeContext);
 		}
 
 		@Override
-		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("State is not supported in rich async function");
+		public int getSuperstepNumber() {
+			return iterationRuntimeContext.getSuperstepNumber();
 		}
 
+		// -----------------------------------------------------------------------------------
+		// Unsupported operations
+		// -----------------------------------------------------------------------------------
+
 		@Override
-		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("State is not supported in rich async function");
+		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+			throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
 		}
 
 		@Override
-		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-			throw new UnsupportedOperationException("State is not supported in rich async function");
+		public <T extends Value> T getPreviousIterationAggregate(String name) {
+			throw new UnsupportedOperationException("Iteration aggregators are not supported in rich async functions.");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
deleted file mode 100644
index 29643fd..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Abstract implementation for {@link StreamElementEntry}
- *
- * @param <OUT> Output type.
- */
-public abstract class AbstractBufferEntry<OUT> implements StreamElementEntry<OUT> {
-	private final StreamElement streamElement;
-
-	protected AbstractBufferEntry(StreamElement element) {
-		this.streamElement = Preconditions.checkNotNull(element, "Reference to StreamElement should not be null");
-	}
-
-	@Override
-	public List<OUT> getResult() throws IOException {
-		throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
-	}
-
-	@Override
-	public void markDone() {
-		throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
-	}
-
-	@Override
-	public boolean isDone() {
-		throw new UnsupportedOperationException("It must be overriden by the concrete entry");
-	}
-
-	@Override
-	public boolean isStreamRecord() {
-		return streamElement.isRecord();
-	}
-
-	@Override
-	public boolean isWatermark() {
-		return streamElement.isWatermark();
-	}
-
-	@Override
-	public boolean isLatencyMarker() {
-		return streamElement.isLatencyMarker();
-	}
-
-	@Override
-	public StreamElement getStreamElement() {
-		return streamElement;
-	}
-
-	@Override
-	public String toString() {
-		return "StreamElementEntry for @" + streamElement;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
deleted file mode 100644
index ee176d9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer,
- * and emit results from {@link AsyncCollector} to the next operators following it by
- * calling {@link Output#collect(Object)}
- */
-@Internal
-public class AsyncCollectorBuffer<IN, OUT> {
-
-	/**
-	 * The logger.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class);
-
-	/**
-	 * Max number of {@link AsyncCollector} in the buffer.
-	 */
-	private final int bufferSize;
-
-	private final AsyncDataStream.OutputMode mode;
-
-	private final AsyncWaitOperator<IN, OUT> operator;
-
-	/**
-	 * Keep all {@link StreamElementEntry}
-	 */
-	private final Set<StreamElementEntry<OUT>> queue = new LinkedHashSet<>();
-
-	/**
-	 * Keep all {@link StreamElementEntry} to their corresponding {@link Watermark} or {@link LatencyMarker}
-	 * If the inputs are: SR1, SR2, WM1, SR3, SR4. Then SR1 and SR2 belong to WM1, and
-	 * SR3 and SR4 will be kept in {@link #lonelyEntries}
-	 */
-	private final Map<StreamElementEntry<OUT>, StreamElement> entriesToMarkers = new HashMap<>();
-
-	private final List<StreamElementEntry<OUT>> lonelyEntries = new LinkedList<>();
-
-	/**
-	 * Keep finished AsyncCollector belonging to the oldest Watermark or LatencyMarker in UNORDERED mode.
-	 */
-	private final Map<StreamElement, Set<StreamElementEntry<OUT>>> markerToFinishedEntries = new LinkedHashMap<>();
-	private Set<StreamElementEntry<OUT>>lonelyFinishedEntries = new HashSet<>();
-
-	/**
-	 * For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the
-	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue}
-	 * is full since main thread waits on this lock. The StreamElement in
-	 * {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements
-	 * in its queue. It will be kept in the operator state while snapshotting.
-	 */
-	private StreamElement extraStreamElement;
-
-	/**
-	 * {@link TimestampedCollector} and {@link Output} to collect results and watermarks.
-	 */
-	private final Output<StreamRecord<OUT>> output;
-	private final TimestampedCollector<OUT> timestampedCollector;
-
-	/**
-	 * Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
-	 */
-	private final Object lock;
-
-	private final Emitter emitter;
-	private final Thread emitThread;
-
-	/**
-	 * Exception from async operation or internal error
-	 */
-	private Exception error;
-
-	/**
-	 * Flag telling Emitter thread to work or not.
-	 */
-	private volatile boolean workwork = false;
-
-	public AsyncCollectorBuffer(
-			int bufferSize,
-			AsyncDataStream.OutputMode mode,
-			Output<StreamRecord<OUT>> output,
-			TimestampedCollector<OUT> collector,
-			Object lock,
-			AsyncWaitOperator operator) {
-		Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0.");
-
-		this.bufferSize = bufferSize;
-
-		this.mode = Preconditions.checkNotNull(mode, "Processing mode should not be NULL.");
-		this.output = Preconditions.checkNotNull(output, "Output should not be NULL.");
-		this.timestampedCollector = Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL.");
-		this.operator = Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL.");
-		this.lock = Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL.");
-
-		this.emitter = new Emitter();
-		this.emitThread = new Thread(emitter);
-		this.emitThread.setDaemon(true);
-	}
-
-	/**
-	 * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be created and returned
-	 * corresponding to the input StreamRecord.
-	 * <p>
-	 * If buffer is full, caller will wait until a new space is available.
-	 *
-	 * @param record StreamRecord
-	 * @return An AsyncCollector
-	 * @throws Exception Exception from AsyncCollector.
-	 */
-	public AsyncCollector<OUT> addStreamRecord(StreamRecord<IN> record) throws Exception {
-		assert(Thread.holdsLock(lock));
-
-		while (queue.size() >= bufferSize) {
-			// hold the input StreamRecord until it is placed in the buffer
-			extraStreamElement = record;
-
-			lock.wait();
-		}
-
-		if (error != null) {
-			throw error;
-		}
-
-		StreamElementEntry<OUT> entry = new StreamRecordEntry<>(record, this);
-
-		queue.add(entry);
-
-		if (mode == AsyncDataStream.OutputMode.UNORDERED) {
-			lonelyEntries.add(entry);
-		}
-
-		extraStreamElement = null;
-
-		return (AsyncCollector<OUT>)entry;
-	}
-
-	/**
-	 * Add a {@link Watermark} into buffer.
-	 * <p>
-	 * If queue is full, caller will wait here.
-	 *
-	 * @param watermark Watermark
-	 * @throws Exception Exception from AsyncCollector.
-	 */
-	public void addWatermark(Watermark watermark) throws Exception {
-		processMark(new WatermarkEntry<OUT>(watermark));
-	}
-
-	/**
-	 * Add a {@link LatencyMarker} into buffer.
-	 * <p>
-	 * If queue is full, caller will wait here.
-	 *
-	 * @param latencyMarker LatencyMarker
-	 * @throws Exception Exception from AsyncCollector.
-	 */
-	public void addLatencyMarker(LatencyMarker latencyMarker) throws Exception {
-		processMark(new LatencyMarkerEntry<OUT>(latencyMarker));
-	}
-
-	/**
-	 * Notify the emitter thread and main thread that an AsyncCollector has completed.
-	 *
-	 * @param entry Completed AsyncCollector
-	 */
-	public void markCollectorCompleted(StreamElementEntry<OUT> entry) {
-		synchronized (lock) {
-			entry.markDone();
-
-			if (mode == AsyncDataStream.OutputMode.UNORDERED) {
-				StreamElement marker = entriesToMarkers.get(entry);
-
-				if (marker != null) {
-					markerToFinishedEntries.get(marker).add(entry);
-				}
-				else {
-					lonelyFinishedEntries.add(entry);
-				}
-			}
-
-			// if workwork is true, it is not necessary to check it again
-			if (!workwork && shouldNotifyEmitterThread(entry)) {
-				workwork = true;
-
-				lock.notifyAll();
-			}
-		}
-	}
-
-	/**
-	 * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks have returned yet.
-	 *
-	 * @throws Exception IOException from AsyncCollector.
-	 */
-	public void waitEmpty() throws Exception {
-		assert(Thread.holdsLock(lock));
-
-		while (queue.size() != 0) {
-			if (error != null) {
-				throw error;
-			}
-
-			lock.wait();
-		}
-	}
-
-	public void startEmitterThread() {
-		emitThread.start();
-	}
-
-	public void stopEmitterThread() {
-		emitter.stop();
-
-		emitThread.interrupt();
-
-		while (emitThread.isAlive()) {
-			// temporarily release the lock first, since caller of this method may also hold the lock.
-			if (Thread.holdsLock(lock)) {
-				try {
-					lock.wait(1000);
-				}
-				catch (InterruptedException e) {
-					// do nothing
-				}
-			}
-
-			try {
-				emitThread.join(10000);
-			} catch (InterruptedException e) {
-				// do nothing
-			}
-
-			// get the stack trace
-			StringBuilder sb = new StringBuilder();
-			StackTraceElement[] stack = emitThread.getStackTrace();
-
-			for (StackTraceElement e : stack) {
-				sb.append(e).append('\n');
-			}
-
-			LOG.warn("Emitter thread blocks due to {}", sb.toString());
-
-			emitThread.interrupt();
-		}
-	}
-
-	/**
-	 * Get all StreamElements in the AsyncCollector queue.
-	 * <p>
-	 * Emitter Thread can not output records and will wait for a while due to checkpoiting procedure
-	 * holding the checkpoint lock.
-	 *
-	 * @return An {@link Iterator} to the StreamElements in the buffer, including the extra one.
-	 */
-	public Iterator<StreamElement> getStreamElementsInBuffer() {
-		final Iterator<StreamElementEntry<OUT>> iterator = queue.iterator();
-		final StreamElement extra = extraStreamElement;
-
-		return new Iterator<StreamElement>() {
-			boolean shouldSendExtraElement = (extra != null);
-
-			@Override
-			public boolean hasNext() {
-				return iterator.hasNext() || shouldSendExtraElement;
-			}
-
-			@Override
-			public StreamElement next() {
-				if (!hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				if (iterator.hasNext()) {
-					return iterator.next().getStreamElement();
-				}
-				else {
-					shouldSendExtraElement = false;
-
-					return extra;
-				}
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException("remove");
-			}
-		};
-	}
-
-	private void processMark(StreamElementEntry<OUT> entry) throws Exception {
-		assert(Thread.holdsLock(lock));
-
-		StreamElement mark = entry.getStreamElement();
-
-		while (queue.size() >= bufferSize) {
-			// hold the input StreamRecord until it is placed in the buffer
-			extraStreamElement = mark;
-
-			lock.wait();
-		}
-
-		if (error != null) {
-			throw error;
-		}
-
-		queue.add(entry);
-
-		if (mode == AsyncDataStream.OutputMode.UNORDERED) {
-			// update AsyncCollector to Watermark / LatencyMarker map
-			for (StreamElementEntry<OUT> e : lonelyEntries) {
-				entriesToMarkers.put(e, mark);
-			}
-
-			lonelyEntries.clear();
-
-			// update Watermark / LatencyMarker to finished AsyncCollector map
-			markerToFinishedEntries.put(mark, lonelyFinishedEntries);
-
-			lonelyFinishedEntries = new HashSet<>();
-		}
-
-		extraStreamElement = null;
-
-		// notify Emitter thread if the head of buffer is Watermark or LatencyMarker
-		// this is for the case when LatencyMarkers keep coming but there is no StreamRecords.
-		StreamElementEntry<OUT> element = queue.iterator().next();
-
-		if (element.isLatencyMarker() || element.isWatermark()) {
-			workwork = true;
-
-			lock.notifyAll();
-		}
-	}
-
-	private boolean shouldNotifyEmitterThread(StreamElementEntry<OUT> entry) {
-
-		switch (mode) {
-
-			case ORDERED:
-				Iterator<StreamElementEntry<OUT>> queueIterator = queue.iterator();
-
-				// get to work as long as the first AsyncCollect is done.
-				return queueIterator.hasNext() && (queueIterator.next().isDone());
-
-			case UNORDERED:
-				Iterator<Map.Entry<StreamElement, Set<StreamElementEntry<OUT>>>> iteratorMarker =
-						markerToFinishedEntries.entrySet().iterator();
-
-				// get to work only the finished AsyncCollector belongs to the oldest Watermark or LatencyMarker
-				// or no Watermark / LatencyMarker is in the buffer yet.
-				return iteratorMarker.hasNext() ? iteratorMarker.next().getValue().contains(entry)
-						: lonelyFinishedEntries.contains(entry);
-
-			default:
-				// this case should never happen
-				return false;
-		}
-	}
-
-	@VisibleForTesting
-	public Set<StreamElementEntry<OUT>> getQueue() {
-		return queue;
-	}
-
-	@VisibleForTesting
-	public void setExtraStreamElement(StreamElement element) {
-		extraStreamElement = element;
-	}
-
-	/**
-	 * A working thread to output results from {@link AsyncCollector} to the next operator.
-	 */
-	private class Emitter implements Runnable {
-		private volatile boolean running = true;
-
-		private void output(StreamElementEntry<OUT> entry) throws Exception {
-
-			StreamElement element = entry.getStreamElement();
-
-			if (element == null) {
-				throw new Exception("StreamElement in the buffer entry should not be null");
-			}
-
-			if (entry.isStreamRecord()) {
-				List<OUT> result = entry.getResult();
-
-				if (result == null) {
-					throw new Exception("Result for stream record " + element + " is null");
-				}
-
-				// update the timestamp for the collector
-				timestampedCollector.setTimestamp(element.asRecord());
-
-				for (OUT val : result) {
-					timestampedCollector.collect(val);
-				}
-			}
-			else if (entry.isWatermark()) {
-				output.emitWatermark(element.asWatermark());
-			}
-			else if (entry.isLatencyMarker()) {
-				operator.sendLatencyMarker(element.asLatencyMarker());
-			}
-			else {
-				throw new IOException("Unknown input record: " + element);
-			}
-		}
-
-		/**
-		 * Emit results from the finished head collector and its following finished ones.
-		 *
-		 * <p>NOTE: Since {@link #output(StreamElementEntry)} may be blocked if operator chain chained with
-		 * another {@link AsyncWaitOperator} and its buffer is full, we can not use an {@link Iterator} to
-		 * go through {@link #queue} because ConcurrentModificationException may be thrown while we remove
-		 * element in the queue by calling {@link Iterator#remove()}.
-		 *
-		 * <p>Example: Assume operator chain like this: async-wait-operator1(awo1) -> async-wait-operator2(awo2).
-		 * The buffer for awo1 is full so the main thread is blocked there.
-		 * The {@link Emitter} thread, named emitter1, in awo1 is outputting
-		 * data to awo2. Assume that 2 elements have been emitted and the buffer in awo1 has two vacancies. While
-		 * outputting the third one, the buffer in awo2 is full, so emitter1 will wait for a moment. If we use
-		 * {@link Iterator}, it is just before calling {@link Iterator#remove()}. Once the {@link #lock} is released
-		 * and luckily enough, the main thread get the lock. It will modify {@link #queue}, causing
-		 * ConcurrentModificationException once emitter1 runs to {@link Iterator#remove()}.
-		 *
-		 */
-		private void orderedProcess() throws Exception {
-			StreamElementEntry<OUT> entry;
-
-			while (queue.size() > 0 && (entry = queue.iterator().next()).isDone()) {
-				output(entry);
-
-				queue.remove(entry);
-			}
-		}
-
-		/**
-		 * Emit results for each finished collector. Try to emit results prior to the oldest watermark
-		 * in the buffer.
-		 * <p>
-		 * For example, assume the sequence of input StreamElements is:
-		 * Entry(ac1, record1) -> Entry(ac2, record2) -> Entry(ac3, watermark1) -> Entry(ac4, record3).
-		 * and both of ac2 and ac3 have finished. For unordered-mode, ac1 and ac2 are prior to watermark1,
-		 * so ac2 will be emitted. Since ac1 is not ready yet, ac3 have to wait until ac1 is done.
-		 */
-		private void unorderedProcess() throws Exception {
-			// try to emit finished AsyncCollectors in markerToFinishedEntries
-			if (markerToFinishedEntries.size() != 0) {
-				while (markerToFinishedEntries.size() != 0) {
-					Map.Entry<StreamElement, Set<StreamElementEntry<OUT>>> finishedStreamElementEntry =
-							markerToFinishedEntries.entrySet().iterator().next();
-
-					Set<StreamElementEntry<OUT>> finishedElementSet = finishedStreamElementEntry.getValue();
-
-					// While outputting results to the next operator, output may release lock if the following operator
-					// in the chain is another AsyncWaitOperator. During this period, there may be some
-					// finished StreamElementEntry coming into the finishedElementSet, and we should
-					// output all finished elements after re-acquiring the lock.
-					while (finishedElementSet.size() != 0) {
-						StreamElementEntry<OUT> finishedEntry = finishedElementSet.iterator().next();
-
-						output(finishedEntry);
-
-						queue.remove(finishedEntry);
-
-						entriesToMarkers.remove(finishedEntry);
-
-						finishedElementSet.remove(finishedEntry);
-					}
-
-					finishedStreamElementEntry.getValue().clear();
-
-
-					// if all StreamElements belonging to current Watermark / LatencyMarker have been emitted,
-					// emit current Watermark / LatencyMarker
-
-					if (queue.size() == 0) {
-						if (markerToFinishedEntries.size() != 0 || entriesToMarkers.size() != 0
-								|| lonelyEntries.size() != 0 || lonelyFinishedEntries.size() != 0) {
-							throw new IOException("Inner data info is not consistent.");
-						}
-					}
-					else {
-						// check the head AsyncCollector whether it is a Watermark or LatencyMarker.
-						StreamElementEntry<OUT> queueEntry = queue.iterator().next();
-
-						if (!queueEntry.isStreamRecord()) {
-							if (finishedStreamElementEntry.getKey() != queueEntry.getStreamElement()) {
-								throw new IOException("Watermark / LatencyMarker from finished collector map "
-									+ "and input buffer are not the same.");
-							}
-
-							output(queueEntry);
-
-							queue.remove(queueEntry);
-
-							// remove useless data in markerToFinishedEntries
-							markerToFinishedEntries.remove(finishedStreamElementEntry.getKey());
-						}
-						else {
-							break;
-						}
-					}
-				}
-			}
-
-			if (markerToFinishedEntries.size() == 0) {
-				// health check
-				if (entriesToMarkers.size() != 0) {
-					throw new IOException("Entries to marker map should be zero");
-				}
-
-				// no Watermark or LatencyMarker in the buffer yet, emit results in lonelyFinishedEntries
-				while (lonelyFinishedEntries.size() != 0) {
-					StreamElementEntry<OUT> entry = lonelyFinishedEntries.iterator().next();
-
-					output(entry);
-
-					queue.remove(entry);
-
-					lonelyEntries.remove(entry);
-
-					lonelyFinishedEntries.remove(entry);
-				}
-			}
-		}
-
-		private void processFinishedAsyncCollector() throws Exception {
-			if (mode == AsyncDataStream.OutputMode.ORDERED) {
-				orderedProcess();
-			} else {
-				unorderedProcess();
-			}
-		}
-
-		private void clearAndNotify() {
-			// clear all data
-			queue.clear();
-			entriesToMarkers.clear();
-			markerToFinishedEntries.clear();
-			lonelyEntries.clear();
-
-			running = false;
-
-			lock.notifyAll();
-		}
-
-		@Override
-		public void run() {
-			while (running) {
-				synchronized (lock) {
-
-					try {
-						while (!workwork) {
-							lock.wait();
-						}
-
-						processFinishedAsyncCollector();
-
-						lock.notifyAll();
-
-						workwork = false;
-					}
-					catch (InterruptedException e) {
-						// The source of InterruptedException is from:
-						//   1. lock.wait() statement in Emit
-						//   2. collector waiting for vacant buffer
-						// The action for this exception should try to clear all held data and
-						// exit Emit thread.
-
-						clearAndNotify();
-					}
-					catch (Exception e) {
-						// For exceptions, not InterruptedException, it should be propagated
-						// to main thread.
-						error = e;
-
-						clearAndNotify();
-					}
-				}
-			}
-		}
-
-		public void stop() {
-			running = false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
deleted file mode 100644
index 1705c2d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-
-/**
- * {@link AsyncCollectorBuffer} entry for {@link LatencyMarker}
- *
- */
-public class LatencyMarkerEntry<OUT> extends AbstractBufferEntry<OUT> {
-	public LatencyMarkerEntry(LatencyMarker marker) {
-		super(marker);
-	}
-
-	@Override
-	public boolean isDone() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
deleted file mode 100644
index de7f606..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The base class for entries in the {@link AsyncCollectorBuffer}
- *
- * @param <OUT> Output data type
- */
-
-@Internal
-public interface StreamElementEntry<OUT>  {
-	/**
-	 * Get result. Throw IOException while encountering an error.
-	 *
-	 * @return A List of result.
-	 * @throws IOException IOException wrapping errors from user codes.
-	 */
-	List<OUT> getResult() throws IOException;
-
-	/**
-	 * Set the internal flag, marking the async operator has finished.
-	 */
-	void markDone();
-
-	/**
-	 * Get the flag indicating the async operator has finished or not.
-	 *
-	 * @return True for finished async operator.
-	 */
-	boolean isDone();
-
-	/**
-	 * Check inner element is StreamRecord or not.
-	 *
-	 * @return True if element is StreamRecord.
-	 */
-	boolean isStreamRecord();
-
-	/**
-	 * Check inner element is Watermark or not.
-	 *
-	 * @return True if element is Watermark.
-	 */
-	boolean isWatermark();
-
-	/**
-	 * Check inner element is LatencyMarker or not.
-	 *
-	 * @return True if element is LatencyMarker.
-	 */
-	boolean isLatencyMarker();
-
-	/**
-	 * Get inner stream element.
-	 *
-	 * @return Inner {@link StreamElement}.
-	 */
-	StreamElement getStreamElement();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
deleted file mode 100644
index fb0dc3b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link AsyncCollectorBuffer} entry for {@link StreamRecord}
- *
- * @param <IN> Input data type
- * @param <OUT> Output data type
- */
-public class StreamRecordEntry<IN, OUT> extends AbstractBufferEntry<OUT> implements AsyncCollector<OUT> {
-	private List<OUT> result;
-	private Throwable error;
-
-	private boolean isDone = false;
-
-	private final AsyncCollectorBuffer<IN, OUT> buffer;
-
-	public StreamRecordEntry(StreamRecord<IN> element, AsyncCollectorBuffer<IN, OUT> buffer) {
-		super(element);
-		this.buffer = Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null");
-	}
-
-	@Override
-	public void collect(List<OUT> result)  {
-		this.result = result;
-
-		this.buffer.markCollectorCompleted(this);
-	}
-
-	@Override
-	public void collect(Throwable error)  {
-		this.error = error;
-
-		this.buffer.markCollectorCompleted(this);
-	}
-
-	public List<OUT> getResult() throws IOException {
-		if (error != null) {
-			throw new IOException(error.getMessage());
-		}
-		return result;
-	}
-
-	public void markDone() {
-		isDone = true;
-	}
-
-	public boolean isDone() {
-		return isDone;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
deleted file mode 100644
index 8883a2d..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.async.buffer;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * {@link AsyncCollectorBuffer} entry for {@link Watermark}
- *
- */
-public class WatermarkEntry<OUT> extends AbstractBufferEntry<OUT> {
-	public WatermarkEntry(Watermark watermark) {
-		super(watermark);
-	}
-
-	@Override
-	public boolean isDone() {
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
index b2a58d2..a072aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.streaming.api.functions.async.collector;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
-import java.util.List;
+import java.util.Collection;
 
 /**
  * {@link AsyncCollector} collects data / error in user codes while processing async i/o.
  *
  * @param <OUT> Output type
  */
-@Internal
+@PublicEvolving
 public interface AsyncCollector<OUT> {
 	/**
 	 * Set result.
@@ -35,14 +35,15 @@ public interface AsyncCollector<OUT> {
 	 * Note that it should be called for exactly one time in the user code.
 	 * Calling this function for multiple times will cause data lose.
 	 * <p>
-	 * Put all results in a {@link List} and then issue {@link AsyncCollector#collect(List)}.
+	 * Put all results in a {@link Collection} and then issue
+	 * {@link AsyncCollector#collect(Collection)}.
 	 * <p>
 	 * If the result is NULL, it will cause task fail. If collecting empty result set is allowable and
 	 * should not cause task fail-over, then try to collect an empty list collection.
 	 *
 	 * @param result A list of results.
 	 */
-	void collect(List<OUT> result);
+	void collect(Collection<OUT> result);
 
 	/**
 	 * Set error

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
index 56fa14d..dc80e81 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -63,6 +63,10 @@ public class TimestampedCollector<T> implements Collector<T> {
 		reuse.setTimestamp(timestamp);
 	}
 
+	public void eraseTimestamp() {
+		reuse.eraseTimestamp();
+	}
+
 	@Override
 	public void close() {
 		output.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 9166865..88fc833 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -19,110 +19,154 @@
 package org.apache.flink.streaming.api.operators.async;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Iterator;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * The {@link AsyncWaitOperator} will accept input {@link StreamElement} from previous operators,
- * pass them into {@link AsyncFunction}, make a snapshot for the inputs in the {@link AsyncCollectorBuffer}
- * while checkpointing, and restore the {@link AsyncCollectorBuffer} from previous state.
+ * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
+ * the operator creates an {@link AsyncCollector} which is passed to an {@link AsyncFunction}.
+ * Within the async function, the user can complete the async collector arbitrarily. Once the async
+ * collector has been completed, the result is emitted by the operator's emitter to downstream
+ * operators.
  * <p>
- * Note that due to newly added working thread, named {@link AsyncCollectorBuffer.Emitter},
- * if {@link AsyncWaitOperator} is chained with other operators, {@link StreamTask} has to make sure that
- * the the order to open operators in the operator chain should be from the tail operator to the head operator,
- * and order to close operators in the operator chain should be from the head operator to the tail operator.
+ * The operator offers different output modes depending on the chosen
+ * {@link AsyncDataStream.OutputMode}. In order to give exactly once processing guarantees, the
+ * operator stores all currently in-flight {@link StreamElement} in it's operator state. Upon
+ * recovery the recorded set of stream elements is replayed.
+ * <p>
+ * In case of chaining of this operator, it has to be made sure that the operators in the chain are
+ * opened tail to head. The reason for this is that an opened {@link AsyncWaitOperator} starts
+ * already emitting recovered {@link StreamElement} to downstream operators.
  *
  * @param <IN> Input type for the operator.
  * @param <OUT> Output type for the operator.
  */
 @Internal
 public class AsyncWaitOperator<IN, OUT>
-	extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
-	implements OneInputStreamOperator<IN, OUT>
-{
+		extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, OperatorActions {
 	private static final long serialVersionUID = 1L;
 
-	private final static String STATE_NAME = "_async_wait_operator_state_";
+	private static final String STATE_NAME = "_async_wait_operator_state_";
 
-	/**
-	 * {@link TypeSerializer} for inputs while making snapshots.
-	 */
+	/** Capacity of the stream element queue */
+	private final int capacity;
+
+	/** Output mode for this operator */
+	private final AsyncDataStream.OutputMode outputMode;
+
+	/** Timeout for the async collectors */
+	private final long timeout;
+
+	private transient Object checkpointingLock;
+
+	/** {@link TypeSerializer} for inputs while making snapshots. */
 	private transient StreamElementSerializer<IN> inStreamElementSerializer;
 
-	/**
-	 * input stream elements from the state
-	 */
+	/** Recovered input stream elements */
 	private transient ListState<StreamElement> recoveredStreamElements;
 
-	private transient TimestampedCollector<OUT> collector;
+	/** Queue to store the currently in-flight stream elements into */
+	private transient StreamElementQueue queue;
 
-	private transient AsyncCollectorBuffer<IN, OUT> buffer;
+	/** Pending stream element which could not yet added to the queue */
+	private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
 
-	/**
-	 * Checkpoint lock from {@link StreamTask#lock}
-	 */
-	private transient Object checkpointLock;
+	private transient ExecutorService executor;
+
+	/** Emitter for the completed stream element queue entries */
+	private transient Emitter<OUT> emitter;
 
-	private final int bufferSize;
-	private final AsyncDataStream.OutputMode mode;
+	/** Thread running the emitter */
+	private transient Thread emitterThread;
 
 
-	public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, int bufferSize, AsyncDataStream.OutputMode mode) {
+	public AsyncWaitOperator(
+			AsyncFunction<IN, OUT> asyncFunction,
+			int capacity,
+			AsyncDataStream.OutputMode outputMode) {
 		super(asyncFunction);
 		chainingStrategy = ChainingStrategy.ALWAYS;
 
-		Preconditions.checkArgument(bufferSize > 0, "The number of concurrent async operation should be greater than 0.");
-		this.bufferSize = bufferSize;
+		Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
+		this.capacity = capacity;
 
-		this.mode = mode;
+		this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");
+
+		this.timeout = -1L;
 	}
 
 	@Override
 	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
 		super.setup(containingTask, config, output);
 
-		this.inStreamElementSerializer =
-				new StreamElementSerializer(this.getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
-
-		this.collector = new TimestampedCollector<>(output);
-
-		this.checkpointLock = containingTask.getCheckpointLock();
-
-		this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, output, collector, this.checkpointLock, this);
+		this.checkpointingLock = getContainingTask().getCheckpointLock();
+
+		this.inStreamElementSerializer = new StreamElementSerializer<>(
+			getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
+
+		// create the operators executor for the complete operations of the queue entries
+		this.executor = Executors.newSingleThreadExecutor();
+
+		switch (outputMode) {
+			case ORDERED:
+				queue = new OrderedStreamElementQueue(
+					capacity,
+					executor,
+					this);
+				break;
+			case UNORDERED:
+				queue = new UnorderedStreamElementQueue(
+					capacity,
+					executor,
+					this);
+				break;
+			default:
+				throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
+		}
 	}
 
 	@Override
 	public void open() throws Exception {
 		super.open();
 
-		// process stream elements from state, since the Emit thread will start soon as all elements from
-		// previous state are in the AsyncCollectorBuffer, we have to make sure that the order to open all
-		// operators in the operator chain should be from the tail operator to the head operator.
-		if (this.recoveredStreamElements != null) {
-			for (StreamElement element : this.recoveredStreamElements.get()) {
+		// process stream elements from state, since the Emit thread will start as soon as all
+		// elements from previous state are in the StreamElementQueue, we have to make sure that the
+		// order to open all operators in the operator chain proceeds from the tail operator to the
+		// head operator.
+		if (recoveredStreamElements != null) {
+			for (StreamElement element : recoveredStreamElements.get()) {
 				if (element.isRecord()) {
 					processElement(element.<IN>asRecord());
 				}
@@ -133,30 +177,52 @@ public class AsyncWaitOperator<IN, OUT>
 					processLatencyMarker(element.asLatencyMarker());
 				}
 				else {
-					throw new Exception("Unknown record type: "+element.getClass());
+					throw new IllegalStateException("Unknown record type " + element.getClass() +
+						" encountered while opening the operator.");
 				}
 			}
-			this.recoveredStreamElements = null;
+			recoveredStreamElements = null;
 		}
 
-		buffer.startEmitterThread();
+		// create the emitter
+		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
+
+		// start the emitter thread
+		this.emitterThread = new Thread(emitter);
+		emitterThread.setDaemon(true);
+		emitterThread.start();
+
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		AsyncCollector<OUT> collector = buffer.addStreamRecord(element);
+		final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
+
+		if (timeout > 0L) {
+			// register a timeout for this AsyncStreamRecordBufferEntry
+			long timeoutTimestamp = timeout + System.currentTimeMillis();
+
+			getProcessingTimeService().registerTimer(
+				timeoutTimestamp,
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) throws Exception {
+						streamRecordBufferEntry.collect(
+							new TimeoutException("Async function call has timed out."));
+					}
+				});
+		}
 
-		userFunction.asyncInvoke(element.getValue(), collector);
+		addAsyncBufferEntry(streamRecordBufferEntry);
+
+		userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
 	}
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		buffer.addWatermark(mark);
-	}
+		WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);
 
-	@Override
-	public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
-		buffer.addLatencyMarker(latencyMarker);
+		addAsyncBufferEntry(watermarkBufferEntry);
 	}
 
 	@Override
@@ -167,45 +233,155 @@ public class AsyncWaitOperator<IN, OUT>
 				getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
 		partitionableState.clear();
 
-		Iterator<StreamElement> iterator = buffer.getStreamElementsInBuffer();
-		while (iterator.hasNext()) {
-			partitionableState.add(iterator.next());
+		Collection<StreamElementQueueEntry<?>> values = queue.values();
+
+		for (StreamElementQueueEntry<?> value : values) {
+			partitionableState.add(value.getStreamElement());
+		}
+
+		// add the pending stream element queue entry if the stream element queue is currently full
+		if (pendingStreamElementQueueEntry != null) {
+			partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
 		}
 	}
 
 	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
-		recoveredStreamElements =
-				context.getOperatorStateStore().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+		recoveredStreamElements = context
+			.getOperatorStateStore()
+			.getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
 
 	}
 
 	@Override
 	public void close() throws Exception {
 		try {
-			buffer.waitEmpty();
+			assert(Thread.holdsLock(checkpointingLock));
+
+			while (!queue.isEmpty()) {
+				// wait for the emitter thread to output the remaining elements
+				// for that he needs the checkpointing lock and thus we have to free it
+				checkpointingLock.wait();
+			}
 		}
 		finally {
-			// make sure Emitter thread exits and close user function
-			buffer.stopEmitterThread();
+			Exception exception = null;
+
+			try {
+				super.close();
+			} catch (InterruptedException interrupted) {
+				exception = interrupted;
+
+				Thread.currentThread().interrupt();
+			} catch (Exception e) {
+				exception = e;
+			}
+
+			try {
+				// terminate the emitter, the emitter thread and the executor
+				stopResources(true);
+			} catch (InterruptedException interrupted) {
+				exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
+
+				Thread.currentThread().interrupt();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
 
-			super.close();
+			if (exception != null) {
+				LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
+			}
 		}
 	}
 
 	@Override
 	public void dispose() throws Exception {
-		super.dispose();
+		Exception exception = null;
+
+		try {
+			super.dispose();
+		} catch (InterruptedException interrupted) {
+			exception = interrupted;
+
+			Thread.currentThread().interrupt();
+		} catch (Exception e) {
+			exception = e;
+		}
+
+		try {
+			stopResources(false);
+		} catch (InterruptedException interrupted) {
+			exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);
+
+			Thread.currentThread().interrupt();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		if (exception != null) {
+			throw exception;
+		}
+	}
 
-		buffer.stopEmitterThread();
+	/**
+	 * Close the operator's resources. They include the emitter thread and the executor to run
+	 * the queue's complete operation.
+	 *
+	 * @param waitForShutdown is true if the method should wait for the resources to be freed;
+	 *                           otherwise false.
+	 * @throws InterruptedException if current thread has been interrupted
+	 */
+	private void stopResources(boolean waitForShutdown) throws InterruptedException {
+		emitter.stop();
+		emitterThread.interrupt();
+
+		executor.shutdown();
+
+		if (waitForShutdown) {
+			try {
+				if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
+					executor.shutdownNow();
+				}
+			} catch (InterruptedException e) {
+				executor.shutdownNow();
+
+				Thread.currentThread().interrupt();
+			}
+
+			emitterThread.join();
+		} else {
+			executor.shutdownNow();
+		}
 	}
 
-	public void sendLatencyMarker(LatencyMarker marker) throws Exception {
-		super.processLatencyMarker(marker);
+	/**
+	 * Add the given stream element queue entry to the operator's stream element queue. This
+	 * operation blocks until the element has been added.
+	 * <p>
+	 * For that it tries to put the element into the queue and if not successful then it waits on
+	 * the checkpointing lock. The checkpointing lock is also used by the {@link Emitter} to output
+	 * elements. The emitter is also responsible for notifying this method if the queue has capacity
+	 * left again, by calling notifyAll on the checkpointing lock.
+	 *
+	 * @param streamElementQueueEntry to add to the operator's queue
+	 * @param <T> Type of the stream element queue entry's result
+	 * @throws InterruptedException if the current thread has been interrupted
+	 */
+	private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+		assert(Thread.holdsLock(checkpointingLock));
+
+		pendingStreamElementQueueEntry = streamElementQueueEntry;
+
+		while (!queue.tryPut(streamElementQueueEntry)) {
+			// we wait for the emitter to notify us if the queue has space left again
+			checkpointingLock.wait();
+		}
+
+		pendingStreamElementQueueEntry = null;
 	}
 
-	@VisibleForTesting
-	public AsyncCollectorBuffer<IN, OUT> getBuffer() {
-		return buffer;
+	@Override
+	public void failOperator(Throwable throwable) {
+		getContainingTask().getEnvironment().failExternally(throwable);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
new file mode 100644
index 0000000..4b22aaa
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncCollectionResult;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncResult;
+import org.apache.flink.streaming.api.operators.async.queue.AsyncWatermarkResult;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Runnable responsible for consuming elements from the given queue and outputting them to the
+ * given output/timestampedCollector.
+ *
+ * @param <OUT> Type of the output elements
+ */
+public class Emitter<OUT> implements Runnable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);
+
+	/** Lock to hold before outputting */
+	private final Object checkpointLock;
+
+	/** Output for the watermark elements */
+	private final Output<StreamRecord<OUT>> output;
+
+	/** Queue to consume the async results from */
+	private final StreamElementQueue streamElementQueue;
+
+	private final OperatorActions operatorActions;
+
+	/** Output for stream records */
+	private final TimestampedCollector<OUT> timestampedCollector;
+
+	private volatile boolean running;
+
+	public Emitter(
+			final Object checkpointLock,
+			final Output<StreamRecord<OUT>> output,
+			final StreamElementQueue streamElementQueue,
+			final OperatorActions operatorActions) {
+
+		this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
+		this.output = Preconditions.checkNotNull(output, "output");
+		this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "asyncCollectorBuffer");
+		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+		this.timestampedCollector = new TimestampedCollector<>(this.output);
+		this.running = true;
+	}
+
+	@Override
+	public void run() {
+		try {
+			while (running) {
+				LOG.debug("Wait for next completed async stream element result.");
+				AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
+
+				output(streamElementEntry);
+			}
+		} catch (InterruptedException e) {
+			if (running) {
+				operatorActions.failOperator(e);
+			} else {
+				// Thread got interrupted which means that it should shut down
+				LOG.debug("Emitter thread got interrupted. This indicates that the emitter should " +
+					"shut down.");
+			}
+		} catch (Throwable t) {
+			operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
+				"unexpected throwable.", t));
+		}
+	}
+
+	private void output(AsyncResult asyncResult) throws InterruptedException {
+		if (asyncResult.isWatermark()) {
+			synchronized (checkpointLock) {
+				// remove the peeked element from the async collector buffer so that it is no longer
+				// checkpointed
+				streamElementQueue.poll();
+
+				// notify the main thread that there is again space left in the async collector
+				// buffer
+				checkpointLock.notifyAll();
+
+				AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();
+
+				LOG.debug("Output async watermark.");
+				output.emitWatermark(asyncWatermarkResult.getWatermark());
+			}
+		} else {
+			AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();
+
+			if (streamRecordResult.hasTimestamp()) {
+				timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
+			} else {
+				timestampedCollector.eraseTimestamp();
+			}
+
+			synchronized (checkpointLock) {
+				// remove the peeked element from the async collector buffer so that it is no longer
+				// checkpointed
+				streamElementQueue.poll();
+
+				// notify the main thread that there is again space left in the async collector
+				// buffer
+				checkpointLock.notifyAll();
+
+				LOG.debug("Output async stream element collection result.");
+
+				try {
+					Collection<OUT> resultCollection = streamRecordResult.get();
+
+					for (OUT result : resultCollection) {
+						timestampedCollector.collect(result);
+					}
+				} catch (Exception e) {
+					operatorActions.failOperator(
+						new Exception("An async function call terminated with an exception. " +
+							"Failing the AsyncWaitOperator.", e));
+				}
+			}
+		}
+	}
+
+	public void stop() {
+		running = false;
+	}
+}