You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:38 UTC

[2/7] flink git commit: [FLINK-4391] Add asynchronous I/O operations

[FLINK-4391] Add asynchronous I/O operations

1. add an example job 2. fix a bug in state serialization in async wait operator; 3. move broadcast barrier after snapshot operator states

update IT case

adjust the whitespace in IT

1. use final for member variable; 2. initialize resouce in open() 3. use ioexception instead of runtimeexception to propagate errors

make sure head operator comes first while doing shapshot for chained operators

[FLINK-4391] 1. adjust the order of snapshot for operators in one chain, so that head operator can do snapshot first. it is for the chained operator with async wait operator, which will keep emitting data in the internal buffer to its children if stream task perform checkpoint from tail to the head, getting incorrect result. 2. support LatencyMarker in async wait operator

[FLINK-4391] use checkpoint lock in async wait operator; remove emitter thread

[FLINK-4391] use checkpoint lock in async wait operator; remove emitter thread

[FLINK-4391] 1. Re-add emitter thread. Without this thread, if there is no input coming, and we just use main thread to emit result, the finished async collectors may have to wait uncertained period of time to be emitted. The emitter thread can help output them as soos as possible; 2. In UNORDERED mode, only emits results prior to the oldest Watermark in the buffer; 3. Use the latest OperatorStateStore to keep partitionable operator state.

[FLINK-4391] remove change to StreamTask.java

[FLINK-4391] Optimize inner data structure for AsyncWaitOperator, add extra test cases.

[FLINK-4391] Fix UT failure

[FLINK-4391] Fix format problem

[FLINK-4391] Add a RuntimeContext wrapper for RichAsyncFunction to disable getting state from RuntimeContext.

This closes #2629.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5283076
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5283076
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5283076

Branch: refs/heads/master
Commit: f52830763d8f95a955c10265e2c3543a5890e719
Parents: 4a27d21
Author: yushi.wxg <yu...@taobao.com>
Authored: Wed Oct 12 14:31:43 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Dec 20 05:04:07 2016 +0100

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml |  22 +
 .../examples/async/AsyncIOExample.java          | 277 ++++++++
 .../api/datastream/AsyncDataStream.java         | 135 ++++
 .../api/functions/async/AsyncFunction.java      |  88 +++
 .../api/functions/async/RichAsyncFunction.java  | 245 +++++++
 .../async/buffer/AbstractBufferEntry.java       |  78 +++
 .../async/buffer/AsyncCollectorBuffer.java      | 633 ++++++++++++++++++
 .../async/buffer/LatencyMarkerEntry.java        |  36 +
 .../async/buffer/StreamElementEntry.java        |  82 +++
 .../async/buffer/StreamRecordEntry.java         |  75 +++
 .../functions/async/buffer/WatermarkEntry.java  |  36 +
 .../async/collector/AsyncCollector.java         |  53 ++
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../api/operators/async/AsyncWaitOperator.java  | 211 ++++++
 .../streaming/runtime/tasks/OperatorChain.java  |  25 +-
 .../streaming/runtime/tasks/StreamTask.java     |   3 +-
 .../functions/async/RichAsyncFunctionTest.java  | 164 +++++
 .../async/AsyncCollectorBufferTest.java         | 656 +++++++++++++++++++
 .../operators/async/AsyncWaitOperatorTest.java  | 629 ++++++++++++++++++
 .../operators/StreamOperatorChainingTest.java   |   2 +
 .../runtime/tasks/StreamTaskTestHarness.java    |   4 +
 .../util/OneInputStreamOperatorTestHarness.java |  10 +
 .../streaming/api/StreamingOperatorsITCase.java | 208 ++++--
 23 files changed, 3615 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index c418ce7..f2891c9 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -221,6 +221,28 @@ under the License.
 						</configuration>
 					</execution>
 
+					<!-- Async I/O -->
+					<execution>
+						<id>AsyncIO</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>AsyncIO</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.async.AsyncIOExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/async/*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
 					<!-- WordCountPOJO -->
 					<execution>
 						<id>WordCountPOJO</id>

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
new file mode 100644
index 0000000..96c7658
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example to illustrates how to use {@link org.apache.flink.streaming.api.functions.async.AsyncFunction}
+ */
+public class AsyncIOExample {
+
+	/**
+	 * A checkpointed source.
+	 */
+	private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private volatile boolean isRunning = true;
+		private int counter = 0;
+		private int start = 0;
+
+		@Override
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(start);
+		}
+
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			for (Integer i : state)
+				this.start = i;
+		}
+
+		public SimpleSource(int maxNum) {
+			this.counter = maxNum;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			while ((start < counter || counter == -1) && isRunning) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(start);
+					++start;
+
+					// loop back to 0
+					if (start == Integer.MAX_VALUE) {
+						start = 0;
+					}
+				}
+				Thread.sleep(10);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+
+	/**
+	 * An sample of {@link AsyncFunction} using a thread pool and executing working threads
+	 * to simulate multiple async operations.
+	 * <p>
+	 * For the real use case in production environment, the thread pool may stay in the
+	 * async client.
+	 */
+	private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
+		transient static ExecutorService executorService;
+		transient static Random random;
+
+		private int counter;
+
+		/**
+		 * The result of multiplying sleepFactor with a random float is used to pause
+		 * the working thread in the thread pool, simulating a time consuming async operation.
+		 */
+		final long sleepFactor;
+
+		/**
+		 * The ratio to generate an exception to simulate an async error. For example, the error
+		 * may be a TimeoutException while visiting HBase.
+		 */
+		final float failRatio;
+
+		final long shutdownWaitTS;
+
+		public SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
+			this.sleepFactor = sleepFactor;
+			this.failRatio = failRatio;
+			this.shutdownWaitTS = shutdownWaitTS;
+		}
+
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			synchronized (SampleAsyncFunction.class) {
+				if (counter == 0) {
+					executorService = Executors.newFixedThreadPool(30);
+
+					random = new Random();
+				}
+
+				++counter;
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+
+			synchronized (SampleAsyncFunction.class) {
+				--counter;
+
+				if (counter == 0) {
+					executorService.shutdown();
+
+					try {
+						executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS);
+					} catch (InterruptedException e) {
+						executorService.shutdownNow();
+					}
+				}
+			}
+		}
+
+		@Override
+		public void asyncInvoke(final Integer input, final AsyncCollector<String> collector) throws Exception {
+			this.executorService.submit(new Runnable() {
+				@Override
+				public void run() {
+					// wait for while to simulate async operation here
+					int sleep = (int) (random.nextFloat() * sleepFactor);
+					try {
+						Thread.sleep(sleep);
+						List<String> ret = Collections.singletonList("key-" + (input % 10));
+						if (random.nextFloat() < failRatio) {
+							collector.collect(new Exception("wahahahaha..."));
+						} else {
+							collector.collect(ret);
+						}
+					} catch (InterruptedException e) {
+						collector.collect(new ArrayList<String>(0));
+					}
+				}
+			});
+		}
+	}
+
+	private static void printUsage() {
+		System.out.println("To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] " +
+				"[--checkpointMode <exactly_once or at_least_once>] " +
+				"[--maxCount <max number of input from source, -1 for infinite input>] " +
+				"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
+				"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
+				"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]");
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		printUsage();
+
+		// parse parameters
+		final ParameterTool params = ParameterTool.fromArgs(args);
+
+		// check the configuration for the job
+		final String statePath = params.getRequired("fsStatePath");
+		final String cpMode = params.get("checkpointMode", "exactly_once");
+		final int maxCount = params.getInt("maxCount", 100000);
+		final int sleepFactor = params.getInt("sleepFactor", 100);
+		final float failRatio = params.getFloat("failRatio", 0.001f);
+		final String mode = params.get("waitMode", "ordered");
+		final int taskNum =  params.getInt("waitOperatorParallelism", 1);
+		final String timeType = params.get("eventType", "EventTime");
+		final int shutdownWaitTS = params.getInt("shutdownWaitTS", 20000);
+
+		System.out.println("Job configuration\n"
+			+"\tFS state path="+statePath+"\n"
+			+"\tCheckpoint mode="+cpMode+"\n"
+			+"\tMax count of input from source="+maxCount+"\n"
+			+"\tSleep factor="+sleepFactor+"\n"
+			+"\tFail ratio="+failRatio+"\n"
+			+"\tWaiting mode="+mode+"\n"
+			+"\tParallelism for async wait operator="+taskNum+"\n"
+			+"\tEvent type="+timeType+"\n"
+			+"\tShutdown wait timestamp="+shutdownWaitTS);
+
+		// setup state and checkpoint mode
+		env.setStateBackend(new FsStateBackend(statePath));
+		if (cpMode.equals("exactly_once")) {
+			env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
+		}
+		else {
+			env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
+		}
+
+		// enable watermark or not
+		if (timeType.equals("EventTime")) {
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		}
+		else if (timeType.equals("IngestionTime")) {
+			env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		}
+
+		// create input stream of an single integer
+		DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));
+
+		// create async function, which will *wait* for a while to simulate the process of async i/o
+		AsyncFunction<Integer, String> function =
+				new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
+
+		// add async operator to streaming job
+		DataStream<String> result;
+		if (mode.equals("ordered")) {
+			result = AsyncDataStream.orderedWait(inputStream, function, 20).setParallelism(taskNum);
+		}
+		else {
+			result = AsyncDataStream.unorderedWait(inputStream, function, 20).setParallelism(taskNum);
+		}
+
+		// add a reduce to get the sum of each keys.
+		result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+			@Override
+			public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+				out.collect(new Tuple2<>(value, 1));
+			}
+		}).keyBy(0).sum(1).print();
+
+		// execute the program
+		env.execute("Async I/O Example");
+	}
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
new file mode 100644
index 0000000..4fefde0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+
+/**
+ * A helper class to apply {@link AsyncFunction} to a data stream.
+ * <p>
+ * <pre>{@code
+ * DataStream<String> input = ...
+ * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
+ *
+ * AsyncDataStream.orderedWait(input, asyncFunc, 100);
+ * }
+ * </pre>
+ */
+
+@PublicEvolving
+public class AsyncDataStream {
+	public enum OutputMode { ORDERED, UNORDERED }
+
+	private static final int DEFAULT_BUFFER_SIZE = 100;
+
+	/**
+	 * Add an AsyncWaitOperator.
+	 *
+	 * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
+	 * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
+	 * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
+	 * @param mode Processing mode for {@link AsyncWaitOperator}.
+	 * @param <IN> Input type.
+	 * @param <OUT> Output type.
+	 * @return A new {@link SingleOutputStreamOperator}
+	 */
+	private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
+			DataStream<IN> in,
+			AsyncFunction<IN, OUT> func,
+			int bufSize,
+			OutputMode mode) {
+
+		TypeInformation<OUT> outTypeInfo =
+			TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, false,
+				true, in.getType(), Utils.getCallLocationName(), true);
+
+		// create transform
+		AsyncWaitOperator<IN, OUT> operator =
+				new AsyncWaitOperator<>(in.getExecutionEnvironment().clean(func), bufSize, mode);
+
+		return in.transform("async wait operator", outTypeInfo, operator);
+	}
+
+	/**
+	 * Add an AsyncWaitOperator. The order of output stream records may be reordered.
+	 *
+	 * @param in Input {@link DataStream}
+	 * @param func {@link AsyncFunction}
+	 * @param bufSize The max number of async i/o operation that can be triggered
+	 * @param <IN> Type of input record
+	 * @param <OUT> Type of output record
+	 * @return A new {@link SingleOutputStreamOperator}.
+	 */
+	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
+			DataStream<IN> in,
+			AsyncFunction<IN, OUT> func,
+			int bufSize) {
+		return addOperator(in, func, bufSize, OutputMode.UNORDERED);
+	}
+
+	/**
+	 * Add an AsyncWaitOperator. The order of output stream records may be reordered.
+	 * @param in Input {@link DataStream}
+	 * @param func {@link AsyncFunction}
+	 * @param <IN> Type of input record
+	 * @param <OUT> Type of output record
+	 * @return A new {@link SingleOutputStreamOperator}.
+	 */
+	public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
+			DataStream<IN> in,
+			AsyncFunction<IN, OUT> func) {
+		return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.UNORDERED);
+	}
+
+	/**
+	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+	 *
+	 * @param in Input {@link DataStream}
+	 * @param func {@link AsyncFunction}
+	 * @param bufSize The max number of async i/o operation that can be triggered
+	 * @param <IN> Type of input record
+	 * @param <OUT> Type of output record
+	 * @return A new {@link SingleOutputStreamOperator}.
+	 */
+	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
+			DataStream<IN> in,
+			AsyncFunction<IN, OUT> func,
+			int bufSize) {
+		return addOperator(in, func, bufSize, OutputMode.ORDERED);
+	}
+
+	/**
+	 * Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as input ones.
+	 *
+	 * @param in Input {@link DataStream}
+	 * @param func {@link AsyncFunction}
+	 * @param <IN> Type of input record
+	 * @param <OUT> Type of output record
+	 * @return A new {@link SingleOutputStreamOperator}.
+	 */
+	public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
+			DataStream<IN> in,
+			AsyncFunction<IN, OUT> func) {
+		return addOperator(in, func, DEFAULT_BUFFER_SIZE, OutputMode.ORDERED);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
new file mode 100644
index 0000000..b5b7d6f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+
+import java.io.Serializable;
+
+/**
+ * A function to trigger Async I/O operation.
+ * <p>
+ * For each #asyncInvoke, an async io operation can be triggered, and once it has been done,
+ * the result can be collected by calling {@link AsyncCollector#collect}. For each async
+ * operations, their contexts are buffered in the operator immediately after invoking
+ * #asyncInvoke, leading to no blocking for each stream input as long as internal buffer is not full.
+ * <p>
+ * {@link AsyncCollector} can be passed into callbacks or futures provided by async client to
+ * fetch result data. Any error can also be propagate to the operator by {@link AsyncCollector#collect(Throwable)}.
+ *
+ * <p>
+ * Typical usage for callback:
+ * <pre>{@code
+ * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
+ *   @Override
+ *   public void asyncInvoke(String row, AsyncCollector<String> collector) throws Exception {
+ *     HBaseCallback cb = new HBaseCallback(collector);
+ *     Get get = new Get(Bytes.toBytes(row));
+ *     hbase.asyncGet(get, cb);
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * <p>
+ * Typical usage for {@link com.google.common.util.concurrent.ListenableFuture}
+ * <pre>{@code
+ * public class HBaseAsyncFunc implements AsyncFunction<String, String> {
+ *   @Override
+ *   public void asyncInvoke(String row, final AsyncCollector<String> collector) throws Exception {
+ *     Get get = new Get(Bytes.toBytes(row));
+ *     ListenableFuture<Result> future = hbase.asyncGet(get);
+ *     Futures.addCallback(future, new FutureCallback<Result>() {
+ *       public void onSuccess(Result result) {
+ *         List<String> ret = process(result);
+ *         collector.collect(ret);
+ *       }
+ *       public void onFailure(Throwable thrown) {
+ *         collector.collect(thrown);
+ *       }
+ *     });
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * @param <IN> The type of the input elements.
+ * @param <OUT> The type of the returned elements.
+ */
+
+@PublicEvolving
+public interface AsyncFunction<IN, OUT> extends Function, Serializable {
+	/**
+	 * Trigger async operation for each stream input.
+	 *
+	 * @param input Stream Input
+	 * @param collector AsyncCollector
+	 * @exception Exception will make task fail and trigger fail-over process.
+	 */
+	void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
new file mode 100644
index 0000000..f6d3d31
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link AsyncFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * <p>
+ * State related apis in {@link RuntimeContext} are not supported yet because the key may get changed
+ * while accessing states in the working thread.
+ * <p>
+ * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the aggregator
+ * may be modified by multiple threads.
+ *
+ * @param <IN> The type of the input elements.
+ * @param <OUT> The type of the returned elements.
+ */
+
+@PublicEvolving
+public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
+	implements AsyncFunction<IN, OUT> {
+
+	private transient RuntimeContext runtimeContext;
+
+	@Override
+	public void setRuntimeContext(RuntimeContext t) {
+		super.setRuntimeContext(t);
+
+		if (t != null) {
+			runtimeContext = new RichAsyncFunctionRuntimeContext(t);
+		}
+	}
+
+	@Override
+	public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
+
+	@Override
+	public RuntimeContext getRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+
+	@Override
+	public IterationRuntimeContext getIterationRuntimeContext() {
+		if (this.runtimeContext != null) {
+			return (IterationRuntimeContext) runtimeContext;
+		} else {
+			throw new IllegalStateException("The runtime context has not been initialized.");
+		}
+	}
+
+	/**
+	 * A wrapper class to delegate {@link RuntimeContext}. State related apis are disabled.
+	 */
+	private class RichAsyncFunctionRuntimeContext implements IterationRuntimeContext {
+		private RuntimeContext runtimeContext;
+
+		public RichAsyncFunctionRuntimeContext(RuntimeContext context) {
+			runtimeContext = context;
+		}
+
+		private IterationRuntimeContext getIterationRuntineContext() {
+			if (this.runtimeContext instanceof IterationRuntimeContext) {
+				return (IterationRuntimeContext) this.runtimeContext;
+			} else {
+				throw new IllegalStateException("This stub is not part of an iteration step function.");
+			}
+		}
+
+		@Override
+		public int getSuperstepNumber() {
+			return getIterationRuntineContext().getSuperstepNumber();
+		}
+
+		@Override
+		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+			throw new UnsupportedOperationException("Get iteration aggregator is not supported in rich async function");
+		}
+
+		@Override
+		public <T extends Value> T getPreviousIterationAggregate(String name) {
+			return getIterationRuntineContext().getPreviousIterationAggregate(name);
+		}
+
+		@Override
+		public String getTaskName() {
+			return runtimeContext.getTaskName();
+		}
+
+		@Override
+		public MetricGroup getMetricGroup() {
+			return runtimeContext.getMetricGroup();
+		}
+
+		@Override
+		public int getNumberOfParallelSubtasks() {
+			return runtimeContext.getNumberOfParallelSubtasks();
+		}
+
+		@Override
+		public int getIndexOfThisSubtask() {
+			return runtimeContext.getIndexOfThisSubtask();
+		}
+
+		@Override
+		public int getAttemptNumber() {
+			return runtimeContext.getAttemptNumber();
+		}
+
+		@Override
+		public String getTaskNameWithSubtasks() {
+			return runtimeContext.getTaskNameWithSubtasks();
+		}
+
+		@Override
+		public ExecutionConfig getExecutionConfig() {
+			return runtimeContext.getExecutionConfig();
+		}
+
+		@Override
+		public ClassLoader getUserCodeClassLoader() {
+			return runtimeContext.getUserCodeClassLoader();
+		}
+
+		@Override
+		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
+			runtimeContext.addAccumulator(name, accumulator);
+		}
+
+		@Override
+		public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+			return runtimeContext.getAccumulator(name);
+		}
+
+		@Override
+		public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+			return runtimeContext.getAllAccumulators();
+		}
+
+		@Override
+		public IntCounter getIntCounter(String name) {
+			return runtimeContext.getIntCounter(name);
+		}
+
+		@Override
+		public LongCounter getLongCounter(String name) {
+			return runtimeContext.getLongCounter(name);
+		}
+
+		@Override
+		public DoubleCounter getDoubleCounter(String name) {
+			return runtimeContext.getDoubleCounter(name);
+		}
+
+		@Override
+		public Histogram getHistogram(String name) {
+			return runtimeContext.getHistogram(name);
+		}
+
+		@Override
+		public boolean hasBroadcastVariable(String name) {
+			return runtimeContext.hasBroadcastVariable(name);
+		}
+
+		@Override
+		public <RT> List<RT> getBroadcastVariable(String name) {
+			return runtimeContext.getBroadcastVariable(name);
+		}
+
+		@Override
+		public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
+			return runtimeContext.getBroadcastVariableWithInitializer(name, initializer);
+		}
+
+		@Override
+		public DistributedCache getDistributedCache() {
+			return runtimeContext.getDistributedCache();
+		}
+
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async function");
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async function");
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException("State is not supported in rich async function");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
new file mode 100644
index 0000000..29643fd
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AbstractBufferEntry.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Abstract implementation for {@link StreamElementEntry}
+ *
+ * @param <OUT> Output type.
+ */
+public abstract class AbstractBufferEntry<OUT> implements StreamElementEntry<OUT> {
+	private final StreamElement streamElement;
+
+	protected AbstractBufferEntry(StreamElement element) {
+		this.streamElement = Preconditions.checkNotNull(element, "Reference to StreamElement should not be null");
+	}
+
+	@Override
+	public List<OUT> getResult() throws IOException {
+		throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
+	}
+
+	@Override
+	public void markDone() {
+		throw new UnsupportedOperationException("It is only available for StreamRecordEntry");
+	}
+
+	@Override
+	public boolean isDone() {
+		throw new UnsupportedOperationException("It must be overriden by the concrete entry");
+	}
+
+	@Override
+	public boolean isStreamRecord() {
+		return streamElement.isRecord();
+	}
+
+	@Override
+	public boolean isWatermark() {
+		return streamElement.isWatermark();
+	}
+
+	@Override
+	public boolean isLatencyMarker() {
+		return streamElement.isLatencyMarker();
+	}
+
+	@Override
+	public StreamElement getStreamElement() {
+		return streamElement;
+	}
+
+	@Override
+	public String toString() {
+		return "StreamElementEntry for @" + streamElement;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
new file mode 100644
index 0000000..ee176d9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/AsyncCollectorBuffer.java
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer,
+ * and emit results from {@link AsyncCollector} to the next operators following it by
+ * calling {@link Output#collect(Object)}
+ */
+@Internal
+public class AsyncCollectorBuffer<IN, OUT> {
+
+	/**
+	 * The logger.
+	 */
+	private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class);
+
+	/**
+	 * Max number of {@link AsyncCollector} in the buffer.
+	 */
+	private final int bufferSize;
+
+	private final AsyncDataStream.OutputMode mode;
+
+	private final AsyncWaitOperator<IN, OUT> operator;
+
+	/**
+	 * Keep all {@link StreamElementEntry}
+	 */
+	private final Set<StreamElementEntry<OUT>> queue = new LinkedHashSet<>();
+
+	/**
+	 * Keep all {@link StreamElementEntry} to their corresponding {@link Watermark} or {@link LatencyMarker}
+	 * If the inputs are: SR1, SR2, WM1, SR3, SR4. Then SR1 and SR2 belong to WM1, and
+	 * SR3 and SR4 will be kept in {@link #lonelyEntries}
+	 */
+	private final Map<StreamElementEntry<OUT>, StreamElement> entriesToMarkers = new HashMap<>();
+
+	private final List<StreamElementEntry<OUT>> lonelyEntries = new LinkedList<>();
+
+	/**
+	 * Keep finished AsyncCollector belonging to the oldest Watermark or LatencyMarker in UNORDERED mode.
+	 */
+	private final Map<StreamElement, Set<StreamElementEntry<OUT>>> markerToFinishedEntries = new LinkedHashMap<>();
+	private Set<StreamElementEntry<OUT>>lonelyFinishedEntries = new HashSet<>();
+
+	/**
+	 * For the AsyncWaitOperator chained with StreamSource, the checkpoint thread may get the
+	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock} while {@link AsyncCollectorBuffer#queue}
+	 * is full since main thread waits on this lock. The StreamElement in
+	 * {@link AsyncWaitOperator#processElement(StreamRecord)} should be treated as a part of all StreamElements
+	 * in its queue. It will be kept in the operator state while snapshotting.
+	 */
+	private StreamElement extraStreamElement;
+
+	/**
+	 * {@link TimestampedCollector} and {@link Output} to collect results and watermarks.
+	 */
+	private final Output<StreamRecord<OUT>> output;
+	private final TimestampedCollector<OUT> timestampedCollector;
+
+	/**
+	 * Checkpoint lock from {@link org.apache.flink.streaming.runtime.tasks.StreamTask#lock}
+	 */
+	private final Object lock;
+
+	private final Emitter emitter;
+	private final Thread emitThread;
+
+	/**
+	 * Exception from async operation or internal error
+	 */
+	private Exception error;
+
+	/**
+	 * Flag telling Emitter thread to work or not.
+	 */
+	private volatile boolean workwork = false;
+
+	public AsyncCollectorBuffer(
+			int bufferSize,
+			AsyncDataStream.OutputMode mode,
+			Output<StreamRecord<OUT>> output,
+			TimestampedCollector<OUT> collector,
+			Object lock,
+			AsyncWaitOperator operator) {
+		Preconditions.checkArgument(bufferSize > 0, "Future buffer size should be greater than 0.");
+
+		this.bufferSize = bufferSize;
+
+		this.mode = Preconditions.checkNotNull(mode, "Processing mode should not be NULL.");
+		this.output = Preconditions.checkNotNull(output, "Output should not be NULL.");
+		this.timestampedCollector = Preconditions.checkNotNull(collector, "TimestampedCollector should not be NULL.");
+		this.operator = Preconditions.checkNotNull(operator, "Reference to AsyncWaitOperator should not be NULL.");
+		this.lock = Preconditions.checkNotNull(lock, "Checkpoint lock should not be NULL.");
+
+		this.emitter = new Emitter();
+		this.emitThread = new Thread(emitter);
+		this.emitThread.setDaemon(true);
+	}
+
+	/**
+	 * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be created and returned
+	 * corresponding to the input StreamRecord.
+	 * <p>
+	 * If buffer is full, caller will wait until a new space is available.
+	 *
+	 * @param record StreamRecord
+	 * @return An AsyncCollector
+	 * @throws Exception Exception from AsyncCollector.
+	 */
+	public AsyncCollector<OUT> addStreamRecord(StreamRecord<IN> record) throws Exception {
+		assert(Thread.holdsLock(lock));
+
+		while (queue.size() >= bufferSize) {
+			// hold the input StreamRecord until it is placed in the buffer
+			extraStreamElement = record;
+
+			lock.wait();
+		}
+
+		if (error != null) {
+			throw error;
+		}
+
+		StreamElementEntry<OUT> entry = new StreamRecordEntry<>(record, this);
+
+		queue.add(entry);
+
+		if (mode == AsyncDataStream.OutputMode.UNORDERED) {
+			lonelyEntries.add(entry);
+		}
+
+		extraStreamElement = null;
+
+		return (AsyncCollector<OUT>)entry;
+	}
+
+	/**
+	 * Add a {@link Watermark} into buffer.
+	 * <p>
+	 * If queue is full, caller will wait here.
+	 *
+	 * @param watermark Watermark
+	 * @throws Exception Exception from AsyncCollector.
+	 */
+	public void addWatermark(Watermark watermark) throws Exception {
+		processMark(new WatermarkEntry<OUT>(watermark));
+	}
+
+	/**
+	 * Add a {@link LatencyMarker} into buffer.
+	 * <p>
+	 * If queue is full, caller will wait here.
+	 *
+	 * @param latencyMarker LatencyMarker
+	 * @throws Exception Exception from AsyncCollector.
+	 */
+	public void addLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+		processMark(new LatencyMarkerEntry<OUT>(latencyMarker));
+	}
+
+	/**
+	 * Notify the emitter thread and main thread that an AsyncCollector has completed.
+	 *
+	 * @param entry Completed AsyncCollector
+	 */
+	public void markCollectorCompleted(StreamElementEntry<OUT> entry) {
+		synchronized (lock) {
+			entry.markDone();
+
+			if (mode == AsyncDataStream.OutputMode.UNORDERED) {
+				StreamElement marker = entriesToMarkers.get(entry);
+
+				if (marker != null) {
+					markerToFinishedEntries.get(marker).add(entry);
+				}
+				else {
+					lonelyFinishedEntries.add(entry);
+				}
+			}
+
+			// if workwork is true, it is not necessary to check it again
+			if (!workwork && shouldNotifyEmitterThread(entry)) {
+				workwork = true;
+
+				lock.notifyAll();
+			}
+		}
+	}
+
+	/**
+	 * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks have returned yet.
+	 *
+	 * @throws Exception IOException from AsyncCollector.
+	 */
+	public void waitEmpty() throws Exception {
+		assert(Thread.holdsLock(lock));
+
+		while (queue.size() != 0) {
+			if (error != null) {
+				throw error;
+			}
+
+			lock.wait();
+		}
+	}
+
+	public void startEmitterThread() {
+		emitThread.start();
+	}
+
+	public void stopEmitterThread() {
+		emitter.stop();
+
+		emitThread.interrupt();
+
+		while (emitThread.isAlive()) {
+			// temporarily release the lock first, since caller of this method may also hold the lock.
+			if (Thread.holdsLock(lock)) {
+				try {
+					lock.wait(1000);
+				}
+				catch (InterruptedException e) {
+					// do nothing
+				}
+			}
+
+			try {
+				emitThread.join(10000);
+			} catch (InterruptedException e) {
+				// do nothing
+			}
+
+			// get the stack trace
+			StringBuilder sb = new StringBuilder();
+			StackTraceElement[] stack = emitThread.getStackTrace();
+
+			for (StackTraceElement e : stack) {
+				sb.append(e).append('\n');
+			}
+
+			LOG.warn("Emitter thread blocks due to {}", sb.toString());
+
+			emitThread.interrupt();
+		}
+	}
+
+	/**
+	 * Get all StreamElements in the AsyncCollector queue.
+	 * <p>
+	 * Emitter Thread can not output records and will wait for a while due to checkpoiting procedure
+	 * holding the checkpoint lock.
+	 *
+	 * @return An {@link Iterator} to the StreamElements in the buffer, including the extra one.
+	 */
+	public Iterator<StreamElement> getStreamElementsInBuffer() {
+		final Iterator<StreamElementEntry<OUT>> iterator = queue.iterator();
+		final StreamElement extra = extraStreamElement;
+
+		return new Iterator<StreamElement>() {
+			boolean shouldSendExtraElement = (extra != null);
+
+			@Override
+			public boolean hasNext() {
+				return iterator.hasNext() || shouldSendExtraElement;
+			}
+
+			@Override
+			public StreamElement next() {
+				if (!hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				if (iterator.hasNext()) {
+					return iterator.next().getStreamElement();
+				}
+				else {
+					shouldSendExtraElement = false;
+
+					return extra;
+				}
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException("remove");
+			}
+		};
+	}
+
+	private void processMark(StreamElementEntry<OUT> entry) throws Exception {
+		assert(Thread.holdsLock(lock));
+
+		StreamElement mark = entry.getStreamElement();
+
+		while (queue.size() >= bufferSize) {
+			// hold the input StreamRecord until it is placed in the buffer
+			extraStreamElement = mark;
+
+			lock.wait();
+		}
+
+		if (error != null) {
+			throw error;
+		}
+
+		queue.add(entry);
+
+		if (mode == AsyncDataStream.OutputMode.UNORDERED) {
+			// update AsyncCollector to Watermark / LatencyMarker map
+			for (StreamElementEntry<OUT> e : lonelyEntries) {
+				entriesToMarkers.put(e, mark);
+			}
+
+			lonelyEntries.clear();
+
+			// update Watermark / LatencyMarker to finished AsyncCollector map
+			markerToFinishedEntries.put(mark, lonelyFinishedEntries);
+
+			lonelyFinishedEntries = new HashSet<>();
+		}
+
+		extraStreamElement = null;
+
+		// notify Emitter thread if the head of buffer is Watermark or LatencyMarker
+		// this is for the case when LatencyMarkers keep coming but there is no StreamRecords.
+		StreamElementEntry<OUT> element = queue.iterator().next();
+
+		if (element.isLatencyMarker() || element.isWatermark()) {
+			workwork = true;
+
+			lock.notifyAll();
+		}
+	}
+
+	private boolean shouldNotifyEmitterThread(StreamElementEntry<OUT> entry) {
+
+		switch (mode) {
+
+			case ORDERED:
+				Iterator<StreamElementEntry<OUT>> queueIterator = queue.iterator();
+
+				// get to work as long as the first AsyncCollect is done.
+				return queueIterator.hasNext() && (queueIterator.next().isDone());
+
+			case UNORDERED:
+				Iterator<Map.Entry<StreamElement, Set<StreamElementEntry<OUT>>>> iteratorMarker =
+						markerToFinishedEntries.entrySet().iterator();
+
+				// get to work only the finished AsyncCollector belongs to the oldest Watermark or LatencyMarker
+				// or no Watermark / LatencyMarker is in the buffer yet.
+				return iteratorMarker.hasNext() ? iteratorMarker.next().getValue().contains(entry)
+						: lonelyFinishedEntries.contains(entry);
+
+			default:
+				// this case should never happen
+				return false;
+		}
+	}
+
+	@VisibleForTesting
+	public Set<StreamElementEntry<OUT>> getQueue() {
+		return queue;
+	}
+
+	@VisibleForTesting
+	public void setExtraStreamElement(StreamElement element) {
+		extraStreamElement = element;
+	}
+
+	/**
+	 * A working thread to output results from {@link AsyncCollector} to the next operator.
+	 */
+	private class Emitter implements Runnable {
+		private volatile boolean running = true;
+
+		private void output(StreamElementEntry<OUT> entry) throws Exception {
+
+			StreamElement element = entry.getStreamElement();
+
+			if (element == null) {
+				throw new Exception("StreamElement in the buffer entry should not be null");
+			}
+
+			if (entry.isStreamRecord()) {
+				List<OUT> result = entry.getResult();
+
+				if (result == null) {
+					throw new Exception("Result for stream record " + element + " is null");
+				}
+
+				// update the timestamp for the collector
+				timestampedCollector.setTimestamp(element.asRecord());
+
+				for (OUT val : result) {
+					timestampedCollector.collect(val);
+				}
+			}
+			else if (entry.isWatermark()) {
+				output.emitWatermark(element.asWatermark());
+			}
+			else if (entry.isLatencyMarker()) {
+				operator.sendLatencyMarker(element.asLatencyMarker());
+			}
+			else {
+				throw new IOException("Unknown input record: " + element);
+			}
+		}
+
+		/**
+		 * Emit results from the finished head collector and its following finished ones.
+		 *
+		 * <p>NOTE: Since {@link #output(StreamElementEntry)} may be blocked if operator chain chained with
+		 * another {@link AsyncWaitOperator} and its buffer is full, we can not use an {@link Iterator} to
+		 * go through {@link #queue} because ConcurrentModificationException may be thrown while we remove
+		 * element in the queue by calling {@link Iterator#remove()}.
+		 *
+		 * <p>Example: Assume operator chain like this: async-wait-operator1(awo1) -> async-wait-operator2(awo2).
+		 * The buffer for awo1 is full so the main thread is blocked there.
+		 * The {@link Emitter} thread, named emitter1, in awo1 is outputting
+		 * data to awo2. Assume that 2 elements have been emitted and the buffer in awo1 has two vacancies. While
+		 * outputting the third one, the buffer in awo2 is full, so emitter1 will wait for a moment. If we use
+		 * {@link Iterator}, it is just before calling {@link Iterator#remove()}. Once the {@link #lock} is released
+		 * and luckily enough, the main thread get the lock. It will modify {@link #queue}, causing
+		 * ConcurrentModificationException once emitter1 runs to {@link Iterator#remove()}.
+		 *
+		 */
+		private void orderedProcess() throws Exception {
+			StreamElementEntry<OUT> entry;
+
+			while (queue.size() > 0 && (entry = queue.iterator().next()).isDone()) {
+				output(entry);
+
+				queue.remove(entry);
+			}
+		}
+
+		/**
+		 * Emit results for each finished collector. Try to emit results prior to the oldest watermark
+		 * in the buffer.
+		 * <p>
+		 * For example, assume the sequence of input StreamElements is:
+		 * Entry(ac1, record1) -> Entry(ac2, record2) -> Entry(ac3, watermark1) -> Entry(ac4, record3).
+		 * and both of ac2 and ac3 have finished. For unordered-mode, ac1 and ac2 are prior to watermark1,
+		 * so ac2 will be emitted. Since ac1 is not ready yet, ac3 have to wait until ac1 is done.
+		 */
+		private void unorderedProcess() throws Exception {
+			// try to emit finished AsyncCollectors in markerToFinishedEntries
+			if (markerToFinishedEntries.size() != 0) {
+				while (markerToFinishedEntries.size() != 0) {
+					Map.Entry<StreamElement, Set<StreamElementEntry<OUT>>> finishedStreamElementEntry =
+							markerToFinishedEntries.entrySet().iterator().next();
+
+					Set<StreamElementEntry<OUT>> finishedElementSet = finishedStreamElementEntry.getValue();
+
+					// While outputting results to the next operator, output may release lock if the following operator
+					// in the chain is another AsyncWaitOperator. During this period, there may be some
+					// finished StreamElementEntry coming into the finishedElementSet, and we should
+					// output all finished elements after re-acquiring the lock.
+					while (finishedElementSet.size() != 0) {
+						StreamElementEntry<OUT> finishedEntry = finishedElementSet.iterator().next();
+
+						output(finishedEntry);
+
+						queue.remove(finishedEntry);
+
+						entriesToMarkers.remove(finishedEntry);
+
+						finishedElementSet.remove(finishedEntry);
+					}
+
+					finishedStreamElementEntry.getValue().clear();
+
+
+					// if all StreamElements belonging to current Watermark / LatencyMarker have been emitted,
+					// emit current Watermark / LatencyMarker
+
+					if (queue.size() == 0) {
+						if (markerToFinishedEntries.size() != 0 || entriesToMarkers.size() != 0
+								|| lonelyEntries.size() != 0 || lonelyFinishedEntries.size() != 0) {
+							throw new IOException("Inner data info is not consistent.");
+						}
+					}
+					else {
+						// check the head AsyncCollector whether it is a Watermark or LatencyMarker.
+						StreamElementEntry<OUT> queueEntry = queue.iterator().next();
+
+						if (!queueEntry.isStreamRecord()) {
+							if (finishedStreamElementEntry.getKey() != queueEntry.getStreamElement()) {
+								throw new IOException("Watermark / LatencyMarker from finished collector map "
+									+ "and input buffer are not the same.");
+							}
+
+							output(queueEntry);
+
+							queue.remove(queueEntry);
+
+							// remove useless data in markerToFinishedEntries
+							markerToFinishedEntries.remove(finishedStreamElementEntry.getKey());
+						}
+						else {
+							break;
+						}
+					}
+				}
+			}
+
+			if (markerToFinishedEntries.size() == 0) {
+				// health check
+				if (entriesToMarkers.size() != 0) {
+					throw new IOException("Entries to marker map should be zero");
+				}
+
+				// no Watermark or LatencyMarker in the buffer yet, emit results in lonelyFinishedEntries
+				while (lonelyFinishedEntries.size() != 0) {
+					StreamElementEntry<OUT> entry = lonelyFinishedEntries.iterator().next();
+
+					output(entry);
+
+					queue.remove(entry);
+
+					lonelyEntries.remove(entry);
+
+					lonelyFinishedEntries.remove(entry);
+				}
+			}
+		}
+
+		private void processFinishedAsyncCollector() throws Exception {
+			if (mode == AsyncDataStream.OutputMode.ORDERED) {
+				orderedProcess();
+			} else {
+				unorderedProcess();
+			}
+		}
+
+		private void clearAndNotify() {
+			// clear all data
+			queue.clear();
+			entriesToMarkers.clear();
+			markerToFinishedEntries.clear();
+			lonelyEntries.clear();
+
+			running = false;
+
+			lock.notifyAll();
+		}
+
+		@Override
+		public void run() {
+			while (running) {
+				synchronized (lock) {
+
+					try {
+						while (!workwork) {
+							lock.wait();
+						}
+
+						processFinishedAsyncCollector();
+
+						lock.notifyAll();
+
+						workwork = false;
+					}
+					catch (InterruptedException e) {
+						// The source of InterruptedException is from:
+						//   1. lock.wait() statement in Emit
+						//   2. collector waiting for vacant buffer
+						// The action for this exception should try to clear all held data and
+						// exit Emit thread.
+
+						clearAndNotify();
+					}
+					catch (Exception e) {
+						// For exceptions, not InterruptedException, it should be propagated
+						// to main thread.
+						error = e;
+
+						clearAndNotify();
+					}
+				}
+			}
+		}
+
+		public void stop() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
new file mode 100644
index 0000000..1705c2d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/LatencyMarkerEntry.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+
+/**
+ * {@link AsyncCollectorBuffer} entry for {@link LatencyMarker}
+ *
+ */
+public class LatencyMarkerEntry<OUT> extends AbstractBufferEntry<OUT> {
+	public LatencyMarkerEntry(LatencyMarker marker) {
+		super(marker);
+	}
+
+	@Override
+	public boolean isDone() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
new file mode 100644
index 0000000..de7f606
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamElementEntry.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The base class for entries in the {@link AsyncCollectorBuffer}
+ *
+ * @param <OUT> Output data type
+ */
+
+@Internal
+public interface StreamElementEntry<OUT>  {
+	/**
+	 * Get result. Throw IOException while encountering an error.
+	 *
+	 * @return A List of result.
+	 * @throws IOException IOException wrapping errors from user codes.
+	 */
+	List<OUT> getResult() throws IOException;
+
+	/**
+	 * Set the internal flag, marking the async operator has finished.
+	 */
+	void markDone();
+
+	/**
+	 * Get the flag indicating the async operator has finished or not.
+	 *
+	 * @return True for finished async operator.
+	 */
+	boolean isDone();
+
+	/**
+	 * Check inner element is StreamRecord or not.
+	 *
+	 * @return True if element is StreamRecord.
+	 */
+	boolean isStreamRecord();
+
+	/**
+	 * Check inner element is Watermark or not.
+	 *
+	 * @return True if element is Watermark.
+	 */
+	boolean isWatermark();
+
+	/**
+	 * Check inner element is LatencyMarker or not.
+	 *
+	 * @return True if element is LatencyMarker.
+	 */
+	boolean isLatencyMarker();
+
+	/**
+	 * Get inner stream element.
+	 *
+	 * @return Inner {@link StreamElement}.
+	 */
+	StreamElement getStreamElement();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
new file mode 100644
index 0000000..fb0dc3b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/StreamRecordEntry.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link AsyncCollectorBuffer} entry for {@link StreamRecord}
+ *
+ * @param <IN> Input data type
+ * @param <OUT> Output data type
+ */
+public class StreamRecordEntry<IN, OUT> extends AbstractBufferEntry<OUT> implements AsyncCollector<OUT> {
+	private List<OUT> result;
+	private Throwable error;
+
+	private boolean isDone = false;
+
+	private final AsyncCollectorBuffer<IN, OUT> buffer;
+
+	public StreamRecordEntry(StreamRecord<IN> element, AsyncCollectorBuffer<IN, OUT> buffer) {
+		super(element);
+		this.buffer = Preconditions.checkNotNull(buffer, "Reference to AsyncCollectorBuffer should not be null");
+	}
+
+	@Override
+	public void collect(List<OUT> result)  {
+		this.result = result;
+
+		this.buffer.markCollectorCompleted(this);
+	}
+
+	@Override
+	public void collect(Throwable error)  {
+		this.error = error;
+
+		this.buffer.markCollectorCompleted(this);
+	}
+
+	public List<OUT> getResult() throws IOException {
+		if (error != null) {
+			throw new IOException(error.getMessage());
+		}
+		return result;
+	}
+
+	public void markDone() {
+		isDone = true;
+	}
+
+	public boolean isDone() {
+		return isDone;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
new file mode 100644
index 0000000..8883a2d
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/buffer/WatermarkEntry.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.buffer;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link AsyncCollectorBuffer} entry for {@link Watermark}
+ *
+ */
+public class WatermarkEntry<OUT> extends AbstractBufferEntry<OUT> {
+	public WatermarkEntry(Watermark watermark) {
+		super(watermark);
+	}
+
+	@Override
+	public boolean isDone() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
new file mode 100644
index 0000000..b2a58d2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async.collector;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.List;
+
+/**
+ * {@link AsyncCollector} collects data / error in user codes while processing async i/o.
+ *
+ * @param <OUT> Output type
+ */
+@Internal
+public interface AsyncCollector<OUT> {
+	/**
+	 * Set result.
+	 * <p>
+	 * Note that it should be called for exactly one time in the user code.
+	 * Calling this function for multiple times will cause data lose.
+	 * <p>
+	 * Put all results in a {@link List} and then issue {@link AsyncCollector#collect(List)}.
+	 * <p>
+	 * If the result is NULL, it will cause task fail. If collecting empty result set is allowable and
+	 * should not cause task fail-over, then try to collect an empty list collection.
+	 *
+	 * @param result A list of results.
+	 */
+	void collect(List<OUT> result);
+
+	/**
+	 * Set error
+	 *
+	 * @param error A Throwable object.
+	 */
+	void collect(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f9b711e..1c27293 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -596,12 +596,12 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 
-	protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+	protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
 		// all operators are tracking latencies
-		this.latencyGauge.reportLatency(maker, false);
+		this.latencyGauge.reportLatency(marker, false);
 
 		// everything except sinks forwards latency markers
-		this.output.emitLatencyMarker(maker);
+		this.output.emitLatencyMarker(marker);
 	}
 
 	// ----------------------- Helper classes -----------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
new file mode 100644
index 0000000..9166865
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+
+/**
+ * The {@link AsyncWaitOperator} will accept input {@link StreamElement} from previous operators,
+ * pass them into {@link AsyncFunction}, make a snapshot for the inputs in the {@link AsyncCollectorBuffer}
+ * while checkpointing, and restore the {@link AsyncCollectorBuffer} from previous state.
+ * <p>
+ * Note that due to newly added working thread, named {@link AsyncCollectorBuffer.Emitter},
+ * if {@link AsyncWaitOperator} is chained with other operators, {@link StreamTask} has to make sure that
+ * the the order to open operators in the operator chain should be from the tail operator to the head operator,
+ * and order to close operators in the operator chain should be from the head operator to the tail operator.
+ *
+ * @param <IN> Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ */
+@Internal
+public class AsyncWaitOperator<IN, OUT>
+	extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+	implements OneInputStreamOperator<IN, OUT>
+{
+	private static final long serialVersionUID = 1L;
+
+	private final static String STATE_NAME = "_async_wait_operator_state_";
+
+	/**
+	 * {@link TypeSerializer} for inputs while making snapshots.
+	 */
+	private transient StreamElementSerializer<IN> inStreamElementSerializer;
+
+	/**
+	 * input stream elements from the state
+	 */
+	private transient ListState<StreamElement> recoveredStreamElements;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient AsyncCollectorBuffer<IN, OUT> buffer;
+
+	/**
+	 * Checkpoint lock from {@link StreamTask#lock}
+	 */
+	private transient Object checkpointLock;
+
+	private final int bufferSize;
+	private final AsyncDataStream.OutputMode mode;
+
+
+	public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, int bufferSize, AsyncDataStream.OutputMode mode) {
+		super(asyncFunction);
+		chainingStrategy = ChainingStrategy.ALWAYS;
+
+		Preconditions.checkArgument(bufferSize > 0, "The number of concurrent async operation should be greater than 0.");
+		this.bufferSize = bufferSize;
+
+		this.mode = mode;
+	}
+
+	@Override
+	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+		super.setup(containingTask, config, output);
+
+		this.inStreamElementSerializer =
+				new StreamElementSerializer(this.getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
+
+		this.collector = new TimestampedCollector<>(output);
+
+		this.checkpointLock = containingTask.getCheckpointLock();
+
+		this.buffer = new AsyncCollectorBuffer<>(bufferSize, mode, output, collector, this.checkpointLock, this);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		// process stream elements from state, since the Emit thread will start soon as all elements from
+		// previous state are in the AsyncCollectorBuffer, we have to make sure that the order to open all
+		// operators in the operator chain should be from the tail operator to the head operator.
+		if (this.recoveredStreamElements != null) {
+			for (StreamElement element : this.recoveredStreamElements.get()) {
+				if (element.isRecord()) {
+					processElement(element.<IN>asRecord());
+				}
+				else if (element.isWatermark()) {
+					processWatermark(element.asWatermark());
+				}
+				else if (element.isLatencyMarker()) {
+					processLatencyMarker(element.asLatencyMarker());
+				}
+				else {
+					throw new Exception("Unknown record type: "+element.getClass());
+				}
+			}
+			this.recoveredStreamElements = null;
+		}
+
+		buffer.startEmitterThread();
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		AsyncCollector<OUT> collector = buffer.addStreamRecord(element);
+
+		userFunction.asyncInvoke(element.getValue(), collector);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		buffer.addWatermark(mark);
+	}
+
+	@Override
+	public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+		buffer.addLatencyMarker(latencyMarker);
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+
+		ListState<StreamElement> partitionableState =
+				getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+		partitionableState.clear();
+
+		Iterator<StreamElement> iterator = buffer.getStreamElementsInBuffer();
+		while (iterator.hasNext()) {
+			partitionableState.add(iterator.next());
+		}
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		recoveredStreamElements =
+				context.getOperatorStateStore().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
+
+	}
+
+	@Override
+	public void close() throws Exception {
+		try {
+			buffer.waitEmpty();
+		}
+		finally {
+			// make sure Emitter thread exits and close user function
+			buffer.stopEmitterThread();
+
+			super.close();
+		}
+	}
+
+	@Override
+	public void dispose() throws Exception {
+		super.dispose();
+
+		buffer.stopEmitterThread();
+	}
+
+	public void sendLatencyMarker(LatencyMarker marker) throws Exception {
+		super.processLatencyMarker(marker);
+	}
+
+	@VisibleForTesting
+	public AsyncCollectorBuffer<IN, OUT> getBuffer() {
+		return buffer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 5ea84fb..680cc29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -17,13 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,22 +30,28 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 
 /**
  * The {@code OperatorChain} contains all operators that are executed as one chain within a single
@@ -67,7 +66,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
 	
 	private final StreamOperator<?>[] allOperators;
-	
+
 	private final RecordWriterOutput<?>[] streamOutputs;
 	
 	private final Output<StreamRecord<OUT>> chainEntryPoint;
@@ -96,7 +95,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		try {
 			for (int i = 0; i < outEdgesInOrder.size(); i++) {
 				StreamEdge outEdge = outEdgesInOrder.get(i);
-				
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
 						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
 						containingTask.getEnvironment(), containingTask.getName());
@@ -116,7 +114,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 
 			// add head operator to end of chain
 			allOps.add(headOperator);
-			
+
 			this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
 			
 			success = true;
@@ -245,7 +243,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 
 			Output<StreamRecord<T>> output = createChainedOperator(
 					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
-			
 			allOutputs.add(new Tuple2<>(output, outputEdge));
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/f5283076/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 88a29ab..0fb22b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -773,8 +773,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					keyGroupRange,
 					restoreStateHandles.getManagedKeyedState(),
 					getEnvironment().getTaskKvStateRegistry());
-
-			restoreStateHandles = null; // GC friendliness
 		} else {
 			keyedStateBackend = stateBackend.createKeyedStateBackend(
 					getEnvironment(),
@@ -1076,6 +1074,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 					outStream.close();
 				}
 			}
+
 			nonPartitionedStates.add(stateHandle);
 		}