You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:40 UTC

[4/7] flink git commit: [FLINK-4391] Polish asynchronous I/O operations

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
new file mode 100644
index 0000000..5a2e43c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OperatorActions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * Interface for {@link StreamOperator} actions.
+ */
+public interface OperatorActions {
+
+	/**
+	 * Fail the respective stream operator with the given throwable.
+	 *
+	 * @param throwable to fail the stream operator with
+	 */
+	void failOperator(Throwable throwable);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
new file mode 100644
index 0000000..8088bf0
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncCollectionResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import java.util.Collection;
+
+/**
+ * {@link AsyncResult} sub class for asynchronous result collections.
+ *
+ * @param <T> Type of the collection elements.
+ */
+public interface AsyncCollectionResult<T> extends AsyncResult {
+
+	boolean hasTimestamp();
+
+	long getTimestamp();
+
+	/**
+	 * Return the asynchronous result collection.
+	 *
+	 * @return the asynchronous result collection
+	 * @throws Exception if the asynchronous result collection could not be completed
+	 */
+	Collection<T> get() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
new file mode 100644
index 0000000..1a99928
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * Asynchronous result returned by the {@link StreamElementQueue}. The asynchronous result can
+ * either be a {@link Watermark} or a collection of new output elements produced by the
+ * {@link AsyncFunction}.
+ */
+public interface AsyncResult {
+
+	/**
+	 * True if the async result is a {@link Watermark}; otherwise false.
+	 *
+	 * @return True if the async result is a {@link Watermark}; otherwise false.
+	 */
+	boolean isWatermark();
+
+	/**
+	 * True fi the async result is a collection of output elements; otherwise false.
+	 *
+	 * @return True if the async reuslt is a collection of output elements; otherwise false
+	 */
+	boolean isResultCollection();
+
+	/**
+	 * Return this async result as a async watermark result.
+	 *
+	 * @return this result as a {@link AsyncWatermarkResult}.
+	 */
+	AsyncWatermarkResult asWatermark();
+
+	/**
+	 * Return this async result as a async result collection.
+	 *
+	 * @param <T> Type of the result collection's elements
+	 * @return this result as a {@link AsyncCollectionResult}.
+	 */
+	<T> AsyncCollectionResult<T> asResultCollection();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
new file mode 100644
index 0000000..c19b520
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/AsyncWatermarkResult.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link AsyncResult} subclass for asynchronous result {@link Watermark}.
+ */
+public interface AsyncWatermarkResult extends AsyncResult {
+	/**
+	 * Get the resulting watermark.
+	 *
+	 * @return the asynchronous result watermark
+	 */
+	Watermark getWatermark();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
new file mode 100644
index 0000000..2bbcb6c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream element queue emits
+ * asynchronous results in the order in which the {@link StreamElementQueueEntry} have been added
+ * to the queue. Thus, even if the completion order can be arbitrary, the output order strictly
+ * follows the insertion order (element cannot overtake each other).
+ */
+public class OrderedStreamElementQueue implements StreamElementQueue {
+
+	private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);
+
+	/** Capacity of this queue */
+	private final int capacity;
+
+	/** Executor to run the onCompletion callback */
+	private final Executor executor;
+
+	/** Operator actions to signal a failure to the operator */
+	private final OperatorActions operatorActions;
+
+	/** Lock and conditions for the blocking queue */
+	private final ReentrantLock lock;
+	private final Condition notFull;
+	private final Condition headIsCompleted;
+
+	/** Queue for the inserted StreamElementQueueEntries */
+	private final ArrayDeque<StreamElementQueueEntry<?>> queue;
+
+	public OrderedStreamElementQueue(
+			int capacity,
+			Executor executor,
+			OperatorActions operatorActions) {
+
+		Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+		this.capacity = capacity;
+
+		this.executor = Preconditions.checkNotNull(executor, "executor");
+
+		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+		this.lock = new ReentrantLock(false);
+		this.headIsCompleted = lock.newCondition();
+		this.notFull = lock.newCondition();
+
+		this.queue = new ArrayDeque<>(capacity);
+	}
+
+	@Override
+	public AsyncResult peekBlockingly() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (queue.isEmpty() || !queue.peek().isDone()) {
+				headIsCompleted.await();
+			}
+
+			LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+				"({}/{}).", queue.size(), capacity);
+
+			return queue.peek();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public AsyncResult poll() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (queue.isEmpty() || !queue.peek().isDone()) {
+				headIsCompleted.await();
+			}
+
+			notFull.signalAll();
+
+			LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
+				"({}/{}).", queue.size() - 1, capacity);
+
+			return queue.poll();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];
+
+			array = queue.toArray(array);
+
+			return Arrays.asList(array);
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return queue.isEmpty();
+	}
+
+	@Override
+	public int size() {
+		return queue.size();
+	}
+
+	@Override
+	public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (queue.size() >= capacity) {
+				notFull.await();
+			}
+
+			addEntry(streamElementQueueEntry);
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			if (queue.size() < capacity) {
+				addEntry(streamElementQueueEntry);
+
+				LOG.debug("Put element into ordered stream element queue. New filling degree " +
+					"({}/{}).", queue.size(), capacity);
+
+				return true;
+			} else {
+				LOG.debug("Failed to put element into ordered stream element queue because it " +
+					"was full ({}/{}).", queue.size(), capacity);
+
+				return false;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Add the given {@link StreamElementQueueEntry} to the queue. Additionally, this method
+	 * registers a onComplete callback which is triggered once the given queue entry is completed.
+	 *
+	 * @param streamElementQueueEntry to be inserted
+	 * @param <T> Type of the stream element queue entry's result
+	 */
+	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
+		assert(lock.isHeldByCurrentThread());
+
+		queue.addLast(streamElementQueueEntry);
+
+		streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
+			@Override
+			public void accept(StreamElementQueueEntry<T> value) {
+				try {
+					onCompleteHandler(value);
+				} catch (InterruptedException e) {
+					// we got interrupted. This indicates a shutdown of the executor
+					LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+						"executor thread has been interrupted.", e);
+				} catch (Throwable t) {
+					operatorActions.failOperator(new Exception("Could not complete the " +
+						"stream element queue entry: " + value + '.', t));
+				}
+			}
+		}, executor);
+	}
+
+	/**
+	 * Check if the completed {@link StreamElementQueueEntry} is the current head. If this is the
+	 * case, then notify the consumer thread about a new consumable entry.
+	 *
+	 * @param streamElementQueueEntry which has been completed
+	 * @throws InterruptedException if the current thread is interrupted
+	 */
+	private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			if (!queue.isEmpty() && queue.peek().isDone()) {
+				LOG.debug("Signal ordered stream element queue has completed head element.");
+				headIsCompleted.signalAll();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
new file mode 100644
index 0000000..1a2c4a8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+
+import java.util.Collection;
+
+/**
+ * Interface for blocking stream element queues for the {@link AsyncWaitOperator}.
+ */
+public interface StreamElementQueue {
+
+	/**
+	 * Put the given element in the queue if capacity is left. If not, then block until this is
+	 * the case.
+	 *
+	 * @param streamElementQueueEntry to be put into the queue
+	 * @param <T> Type of the entries future value
+	 * @throws InterruptedException if the calling thread has been interrupted while waiting to
+	 * 	insert the given element
+	 */
+	<T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
+
+	/**
+	 * Try to put the given element in the queue. This operation succeeds if the queue has capacity
+	 * left and fails if the queue is full.
+	 *
+	 * @param streamElementQueueEntry to be inserted
+	 * @param <T> Type of the entries future value
+	 * @return True if the entry could be inserted; otherwise false
+	 * @throws InterruptedException if the calling thread has been interrupted while waiting to
+	 * 	insert the given element
+	 */
+	<T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;
+
+	/**
+	 * Peek at the head of the queue and return the first completed {@link AsyncResult}. This
+	 * operation is a blocking operation and only returns once a completed async result has been
+	 * found.
+	 *
+	 * @return Completed {@link AsyncResult}
+	 * @throws InterruptedException if the current thread has been interrupted while waiting for a
+	 * 	completed async result.
+	 */
+	AsyncResult peekBlockingly() throws InterruptedException;
+
+	/**
+	 * Poll the first completed {@link AsyncResult} from the head of this queue. This operation is
+	 * blocking and only returns once a completed async result has been found.
+	 *
+	 * @return Completed {@link AsyncResult} which has been removed from the queue
+	 * @throws InterruptedException if the current thread has been interrupted while waiting for a
+	 * 	completed async result.
+	 */
+	AsyncResult poll() throws InterruptedException;
+
+	/**
+	 * Return the collection of {@link StreamElementQueueEntry} currently contained in this queue.
+	 *
+	 * @return Collection of currently contained {@link StreamElementQueueEntry}.
+	 * @throws InterruptedException if the current thread has been interrupted while retrieving the
+	 * 	stream element queue entries of this queue.
+	 */
+	Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;
+
+	/**
+	 * True if the queue is empty; otherwise false.
+	 *
+	 * @return True if the queue is empty; otherwise false.
+	 */
+	boolean isEmpty();
+
+	/**
+	 * Return the size of the queue.
+	 *
+	 * @return The number of elements contained in this queue.
+	 */
+	int size();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
new file mode 100644
index 0000000..06ebf3c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Entry class for the {@link StreamElementQueue}. The stream element queue entry stores the
+ * {@link StreamElement} for which the stream element queue entry has been instantiated.
+ * Furthermore, it allows to register callbacks for when the queue entry is completed.
+ *
+ * @param <T> Type of the result
+ */
+public abstract class StreamElementQueueEntry<T> implements AsyncResult {
+
+	/** Stream element */
+	private final StreamElement streamElement;
+
+	public StreamElementQueueEntry(StreamElement streamElement) {
+		this.streamElement = Preconditions.checkNotNull(streamElement);
+	}
+
+	public StreamElement getStreamElement() {
+		return streamElement;
+	}
+
+	/**
+	 * True if the stream element queue entry has been completed; otherwise false.
+	 *
+	 * @return True if the stream element queue entry has been completed; otherwise false.
+	 */
+	public boolean isDone() {
+		return getFuture().isDone();
+	}
+
+	/**
+	 * Register the given complete function to be called once this queue entry has been completed.
+	 *
+	 * @param completeFunction to call when the queue entry has been completed
+	 * @param executor to run the complete function
+	 */
+	public void onComplete(
+			final AcceptFunction<StreamElementQueueEntry<T>> completeFunction,
+			Executor executor) {
+		final StreamElementQueueEntry<T> thisReference = this;
+
+		getFuture().thenAcceptAsync(new AcceptFunction<T>() {
+			@Override
+			public void accept(T value) {
+				completeFunction.accept(thisReference);
+			}
+		}, executor);
+	}
+
+	protected abstract Future<T> getFuture();
+
+	@Override
+	public final boolean isWatermark() {
+		return AsyncWatermarkResult.class.isAssignableFrom(getClass());
+	}
+
+	@Override
+	public final boolean isResultCollection() {
+		return AsyncCollectionResult.class.isAssignableFrom(getClass());
+	}
+
+	@Override
+	public final AsyncWatermarkResult asWatermark() {
+		return (AsyncWatermarkResult) this;
+	}
+
+	@Override
+	public final <T> AsyncCollectionResult<T> asResultCollection() {
+		return (AsyncCollectionResult<T>) this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
new file mode 100644
index 0000000..f0e707e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for {@link StreamRecord}. This class also acts
+ * as the {@link AsyncCollector} implementation which is given to the {@link AsyncFunction}. The
+ * async function completes this class with a collection of results.
+ *
+ * @param <OUT> Type of the asynchronous collection result
+ */
+public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
+	implements AsyncCollectionResult<OUT>, AsyncCollector<OUT> {
+
+	/** Timestamp information */
+	private final boolean hasTimestamp;
+	private final long timestamp;
+
+	/** Future containing the collection result */
+	private final CompletableFuture<Collection<OUT>> resultFuture;
+
+	public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
+		super(streamRecord);
+
+		hasTimestamp = streamRecord.hasTimestamp();
+		timestamp = streamRecord.getTimestamp();
+
+		resultFuture = new FlinkCompletableFuture<>();
+	}
+
+	@Override
+	public boolean hasTimestamp() {
+		return hasTimestamp;
+	}
+
+	@Override
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	@Override
+	public Collection<OUT> get() throws Exception {
+		return resultFuture.get();
+	}
+
+	@Override
+	protected Future<Collection<OUT>> getFuture() {
+		return resultFuture;
+	}
+
+	@Override
+	public void collect(Collection<OUT> result) {
+		resultFuture.complete(result);
+	}
+
+	@Override
+	public void collect(Throwable error) {
+		resultFuture.completeExceptionally(error);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
new file mode 100644
index 0000000..603d8cc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Unordered implementation of the {@link StreamElementQueue}. The unordered stream element queue
+ * emits asynchronous results as soon as they are completed. Additionally it maintains the
+ * watermark-stream record order. This means that no stream record can be overtaken by a watermark
+ * and no watermark can overtake a stream record. However, stream records falling in the same
+ * segment between two watermarks can overtake each other (their emission order is not guaranteed).
+ */
+public class UnorderedStreamElementQueue implements StreamElementQueue {
+
+	private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);
+
+	/** Capacity of this queue */
+	private final int capacity;
+
+	/** Executor to run the onComplete callbacks */
+	private final Executor executor;
+
+	/** OperatorActions to signal the owning operator a failure */
+	private final OperatorActions operatorActions;
+
+	/** Queue of uncompleted stream element queue entries segmented by watermarks */
+	private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
+
+	/** Queue of completed stream element queue entries */
+	private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
+
+	/** First (chronologically oldest) uncompleted set of stream element queue entries */
+	private Set<StreamElementQueueEntry<?>> firstSet;
+
+	// Last (chronologically youngest) uncompleted set of stream element queue entries. New
+	// stream element queue entries are inserted into this set.
+	private Set<StreamElementQueueEntry<?>> lastSet;
+	private volatile int numberEntries;
+
+	/** Locks and conditions for the blocking queue */
+	private final ReentrantLock lock;
+	private final Condition notFull;
+	private final Condition hasCompletedEntries;
+
+	public UnorderedStreamElementQueue(
+			int capacity,
+			Executor executor,
+			OperatorActions operatorActions) {
+
+		Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
+		this.capacity = capacity;
+
+		this.executor = Preconditions.checkNotNull(executor, "executor");
+
+		this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");
+
+		this.uncompletedQueue = new ArrayDeque<>(capacity);
+		this.completedQueue = new ArrayDeque<>(capacity);
+
+		this.firstSet = new HashSet<>(capacity);
+		this.lastSet = firstSet;
+
+		this.numberEntries = 0;
+
+		this.lock = new ReentrantLock();
+		this.notFull = lock.newCondition();
+		this.hasCompletedEntries = lock.newCondition();
+	}
+
+	@Override
+	public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (numberEntries >= capacity) {
+				notFull.await();
+			}
+
+			addEntry(streamElementQueueEntry);
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			if (numberEntries < capacity) {
+				addEntry(streamElementQueueEntry);
+
+				LOG.debug("Put element into ordered stream element queue. New filling degree " +
+					"({}/{}).", numberEntries, capacity);
+
+				return true;
+			} else {
+				LOG.debug("Failed to put element into ordered stream element queue because it " +
+					"was full ({}/{}).", numberEntries, capacity);
+
+				return false;
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public AsyncResult peekBlockingly() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (completedQueue.isEmpty()) {
+				hasCompletedEntries.await();
+			}
+
+			LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
+				"({}/{}).", numberEntries, capacity);
+
+			return completedQueue.peek();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public AsyncResult poll() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			while (completedQueue.isEmpty()) {
+				hasCompletedEntries.await();
+			}
+
+			numberEntries--;
+			notFull.signalAll();
+
+			LOG.debug("Polled element from unordered stream element queue. New filling degree " +
+				"({}/{}).", numberEntries, capacity);
+
+			return completedQueue.poll();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];
+
+			array = completedQueue.toArray(array);
+
+			int counter = completedQueue.size();
+
+			for (StreamElementQueueEntry<?> entry: firstSet) {
+				array[counter] = entry;
+				counter++;
+			}
+
+			for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {
+
+				for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
+					array[counter] = streamElementQueueEntry;
+					counter++;
+				}
+			}
+
+			return Arrays.asList(array);
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return numberEntries == 0;
+	}
+
+	@Override
+	public int size() {
+		return numberEntries;
+	}
+
+	/**
+	 * Callback for onComplete events for the given stream element queue entry. Whenever a queue
+	 * entry is completed, it is checked whether this entry belogns to the first set. If this is the
+	 * case, then the element is added to the completed entries queue from where it can be consumed.
+	 * If the first set becomes empty, then the next set is polled from the uncompleted entries
+	 * queue. Completed entries from this new set are then added to the completed entries queue.
+	 *
+	 * @param streamElementQueueEntry which has been completed
+	 * @throws InterruptedException if the current thread has been interrupted while performing the
+	 * 	on complete callback.
+	 */
+	public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
+		lock.lockInterruptibly();
+
+		try {
+			if (firstSet.remove(streamElementQueueEntry)) {
+				completedQueue.offer(streamElementQueueEntry);
+
+				while (firstSet.isEmpty() && firstSet != lastSet) {
+					firstSet = uncompletedQueue.poll();
+
+					Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
+
+					while (it.hasNext()) {
+						StreamElementQueueEntry<?> bufferEntry = it.next();
+
+						if (bufferEntry.isDone()) {
+							completedQueue.offer(bufferEntry);
+							it.remove();
+						}
+					}
+				}
+
+				LOG.debug("Signal unordered stream element queue has completed entries.");
+				hasCompletedEntries.signalAll();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Add the given stream element queue entry to the current last set if it is not a watermark.
+	 * If it is a watermark, then stop adding to the current last set, insert the watermark into its
+	 * own set and add a new last set.
+	 *
+	 * @param streamElementQueueEntry to be inserted
+	 * @param <T> Type of the stream element queue entry's result
+	 */
+	private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
+		assert(lock.isHeldByCurrentThread());
+
+		if (streamElementQueueEntry.isWatermark()) {
+			lastSet = new HashSet<>(capacity);
+
+			if (firstSet.isEmpty()) {
+				firstSet.add(streamElementQueueEntry);
+			} else {
+				Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
+				watermarkSet.add(streamElementQueueEntry);
+				uncompletedQueue.offer(watermarkSet);
+			}
+			uncompletedQueue.offer(lastSet);
+		} else {
+			lastSet.add(streamElementQueueEntry);
+		}
+
+		streamElementQueueEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<T>>() {
+			@Override
+			public void accept(StreamElementQueueEntry<T> value) {
+				try {
+					onCompleteHandler(value);
+				} catch (InterruptedException e) {
+					// The accept executor thread got interrupted. This is probably cause by
+					// the shutdown of the executor.
+					LOG.debug("AsyncBufferEntry could not be properly completed because the " +
+						"executor thread has been interrupted.", e);
+				} catch (Throwable t) {
+					operatorActions.failOperator(new Exception("Could not complete the " +
+						"stream element queue entry: " + value + '.', t));
+				}
+			}
+		}, executor);
+
+		numberEntries++;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
new file mode 100644
index 0000000..6fe4f44
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * {@link StreamElementQueueEntry} implementation for the {@link Watermark}.
+ */
+public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {
+
+	private final Future<Watermark> future;
+
+	public WatermarkQueueEntry(Watermark watermark) {
+		super(watermark);
+
+		this.future = FlinkCompletableFuture.completed(watermark);
+	}
+
+	@Override
+	public Watermark getWatermark() {
+		return (Watermark) getStreamElement();
+	}
+
+	@Override
+	protected Future<Watermark> getFuture() {
+		return future;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 680cc29..7771064 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -52,7 +52,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-
 /**
  * The {@code OperatorChain} contains all operators that are executed as one chain within a single
  * {@link StreamTask}.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 0fb22b8..bd9215a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -326,11 +326,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 				LOG.error("Could not shut down async checkpoint threads", t);
 			}
 
-			// release the output resources. this method should never fail.
-			if (operatorChain != null) {
-				operatorChain.releaseOutputs();
-			}
-
 			// we must! perform this cleanup
 			try {
 				cleanup();
@@ -344,6 +339,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			if (!disposed) {
 				disposeAllOperators();
 			}
+
+			// release the output resources. this method should never fail.
+			if (operatorChain != null) {
+				operatorChain.releaseOutputs();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index b8788c6..12ac693 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -18,147 +18,252 @@
 
 package org.apache.flink.streaming.api.functions.async;
 
-import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.junit.Assert;
 import org.junit.Test;
 
-import static org.mockito.Matchers.anyString;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * Test case for {@link RichAsyncFunction}
+ * Test cases for {@link RichAsyncFunction}
  */
 public class RichAsyncFunctionTest {
 
-	private RichAsyncFunction<String, String> initFunction() {
-		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+	/**
+	 * Test the set of iteration runtime context methods in the context of a
+	 * {@link RichAsyncFunction}.
+	 */
+	@Test
+	public void testIterationRuntimeContext() throws Exception {
+		RichAsyncFunction<Integer, Integer> function = new RichAsyncFunction<Integer, Integer>() {
+			private static final long serialVersionUID = -2023923961609455894L;
+
 			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
+			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+				// no op
 			}
 		};
 
-		return function;
+		int superstepNumber = 42;
+
+		IterationRuntimeContext mockedIterationRuntimeContext = mock(IterationRuntimeContext.class);
+		when(mockedIterationRuntimeContext.getSuperstepNumber()).thenReturn(superstepNumber);
+		function.setRuntimeContext(mockedIterationRuntimeContext);
+
+		IterationRuntimeContext iterationRuntimeContext = function.getIterationRuntimeContext();
+
+		assertEquals(superstepNumber, iterationRuntimeContext.getSuperstepNumber());
+
+		try {
+			iterationRuntimeContext.getIterationAggregator("foobar");
+			fail("Expected getIterationAggregator to fail with unsupported operation exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
+
+		try {
+			iterationRuntimeContext.getPreviousIterationAggregate("foobar");
+			fail("Expected getPreviousIterationAggregator to fail with unsupported operation exception");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 	}
 
+	/**
+	 * Test the set of runtime context methods in the context of a {@link RichAsyncFunction}.
+	 */
 	@Test
-	public void testIterationRuntimeContext() throws Exception {
-		// test runtime context is not set
-		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
+	public void testRuntimeContext() throws Exception {
+		RichAsyncFunction<Integer, Integer> function = new RichAsyncFunction<Integer, Integer>() {
+			private static final long serialVersionUID = 1707630162838967972L;
+
 			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getIterationRuntimeContext().getIterationAggregator("test");
+			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
+				// no op
 			}
 		};
 
+		final String taskName = "foobarTask";
+		final MetricGroup metricGroup = mock(MetricGroup.class);
+		final int numberOfParallelSubtasks = 42;
+		final int indexOfSubtask = 43;
+		final int attemptNumber = 1337;
+		final String taskNameWithSubtask = "barfoo";
+		final ExecutionConfig executionConfig = mock(ExecutionConfig.class);
+		final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+
+		RuntimeContext mockedRuntimeContext = mock(RuntimeContext.class);
+
+		when(mockedRuntimeContext.getTaskName()).thenReturn(taskName);
+		when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup);
+		when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks);
+		when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
+		when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
+		when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
+		when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+
+		function.setRuntimeContext(mockedRuntimeContext);
+
+		RuntimeContext runtimeContext = function.getRuntimeContext();
+
+		assertEquals(taskName, runtimeContext.getTaskName());
+		assertEquals(metricGroup, runtimeContext.getMetricGroup());
+		assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks());
+		assertEquals(indexOfSubtask, runtimeContext.getIndexOfThisSubtask());
+		assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
+		assertEquals(taskNameWithSubtask, runtimeContext.getTaskNameWithSubtasks());
+		assertEquals(executionConfig, runtimeContext.getExecutionConfig());
+		assertEquals(userCodeClassLoader, runtimeContext.getUserCodeClassLoader());
+
 		try {
-			function.asyncInvoke("test", mock(AsyncCollector.class));
-		}
-		catch (Exception e) {
-			Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+			runtimeContext.getDistributedCache();
+			fail("Expected getDistributedCached to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
 
-		// test get agg from iteration runtime context
-		function.setRuntimeContext(mock(IterationRuntimeContext.class));
-
 		try {
-			function.asyncInvoke("test", mock(AsyncCollector.class));
+			runtimeContext.getState(new ValueStateDescriptor<>("foobar", Integer.class, 42));
+			fail("Expected getState to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
-		catch (Exception e) {
-			Assert.assertEquals("Get iteration aggregator is not supported in rich async function", e.getMessage());
+
+		try {
+			runtimeContext.getListState(new ListStateDescriptor<>("foobar", Integer.class));
+			fail("Expected getListState to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
 
-		// get state from iteration runtime context
-		function = new RichAsyncFunction<String, String>() {
-			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getIterationRuntimeContext().getState(mock(ValueStateDescriptor.class));
-			}
-		};
+		try {
+			runtimeContext.getReducingState(new ReducingStateDescriptor<>("foobar", new ReduceFunction<Integer>() {
+				private static final long serialVersionUID = 2136425961884441050L;
 
-		function.setRuntimeContext(mock(RuntimeContext.class));
+				@Override
+				public Integer reduce(Integer value1, Integer value2) throws Exception {
+					return value1;
+				}
+			}, Integer.class));
+			fail("Expected getReducingState to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
 		try {
-			function.asyncInvoke("test", mock(AsyncCollector.class));
-		}
-		catch (Exception e) {
-			Assert.assertEquals("State is not supported in rich async function", e.getMessage());
-		}
+			runtimeContext.addAccumulator("foobar", new Accumulator<Integer, Integer>() {
+				private static final long serialVersionUID = -4673320336846482358L;
 
-		// test getting a counter from iteration runtime context
-		function = new RichAsyncFunction<String, String>() {
-			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getIterationRuntimeContext().getIntCounter("test").add(6);
-			}
-		};
+				@Override
+				public void add(Integer value) {
+					// no op
+				}
 
-		IterationRuntimeContext context = mock(IterationRuntimeContext.class);
-		IntCounter counter = new IntCounter(0);
-		when(context.getIntCounter(anyString())).thenReturn(counter);
+				@Override
+				public Integer getLocalValue() {
+					return null;
+				}
 
-		function.setRuntimeContext(context);
+				@Override
+				public void resetLocal() {
 
-		function.asyncInvoke("test", mock(AsyncCollector.class));
+				}
 
-		Assert.assertTrue(6 == counter.getLocalValue());
-	}
+				@Override
+				public void merge(Accumulator<Integer, Integer> other) {
 
-	@Test
-	public void testRuntimeContext() throws Exception {
-		// test run time context is not set
-		RichAsyncFunction<String, String> function = new RichAsyncFunction<String, String>() {
-			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
-			}
-		};
+				}
 
-		try {
-			function.asyncInvoke("test", mock(AsyncCollector.class));
-		}
-		catch (Exception e) {
-			Assert.assertEquals("The runtime context has not been initialized.", e.getMessage());
+				@Override
+				public Accumulator<Integer, Integer> clone() {
+					return null;
+				}
+			});
+			fail("Expected addAccumulator to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
 
-		// test get state
-		function = new RichAsyncFunction<String, String>() {
-			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getRuntimeContext().getState(mock(ValueStateDescriptor.class));
-			}
-		};
+		try {
+			runtimeContext.getAccumulator("foobar");
+			fail("Expected getAccumulator to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
-		function.setRuntimeContext(mock(RuntimeContext.class));
+		try {
+			runtimeContext.getAllAccumulators();
+			fail("Expected getAllAccumulators to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
 		try {
-			function.asyncInvoke("test", mock(AsyncCollector.class));
+			runtimeContext.getIntCounter("foobar");
+			fail("Expected getIntCounter to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
-		catch (Exception e) {
-			Assert.assertEquals("State is not supported in rich async function", e.getMessage());
+
+		try {
+			runtimeContext.getLongCounter("foobar");
+			fail("Expected getLongCounter to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
 		}
 
-		// test getting a counter from runtime context
-		function = new RichAsyncFunction<String, String>() {
-			@Override
-			public void asyncInvoke(String input, AsyncCollector<String> collector) throws Exception {
-				getIterationRuntimeContext().getIntCounter("test").add(6);
-			}
-		};
+		try {
+			runtimeContext.getDoubleCounter("foobar");
+			fail("Expected getDoubleCounter to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
-		IterationRuntimeContext context = mock(IterationRuntimeContext.class);
-		IntCounter counter = new IntCounter(0);
-		when(context.getIntCounter(anyString())).thenReturn(counter);
+		try {
+			runtimeContext.getHistogram("foobar");
+			fail("Expected getHistogram to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
-		function.setRuntimeContext(context);
+		try {
+			runtimeContext.hasBroadcastVariable("foobar");
+			fail("Expected hasBroadcastVariable to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
-		function.asyncInvoke("test", mock(AsyncCollector.class));
+		try {
+			runtimeContext.getBroadcastVariable("foobar");
+			fail("Expected getBroadcastVariable to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 
-		Assert.assertTrue(6 == counter.getLocalValue());
+		try {
+			runtimeContext.getBroadcastVariableWithInitializer("foobar", new BroadcastVariableInitializer<Object, Object>() {
+				@Override
+				public Object initializeBroadcastVariable(Iterable<Object> data) {
+					return null;
+				}
+			});
+			fail("Expected getBroadcastVariableWithInitializer to fail with unsupported operation exception.");
+		} catch (UnsupportedOperationException e) {
+			// expected
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
deleted file mode 100644
index d118d80..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBufferTest.java
+++ /dev/null
@@ -1,656 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.async;
-
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.streaming.api.datastream.AsyncDataStream;
-import org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry;
-import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer;
-import org.apache.flink.streaming.api.functions.async.AsyncFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link AsyncCollectorBuffer}. These test that:
- *
- * <ul>
- *     <li>Add a new item into the buffer</li>
- *     <li>Ordered mode processing</li>
- *     <li>Unordered mode processing</li>
- *     <li>Error handling</li>
- * </ul>
- */
-public class AsyncCollectorBufferTest {
-	private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
-
-	private final Random RANDOM = new Random();
-
-	private AsyncFunction<Integer, Integer> function;
-
-	private AsyncWaitOperator<Integer, Integer> operator;
-
-	private AsyncCollectorBuffer<Integer, Integer> buffer;
-
-	private Output<StreamRecord<Integer>> output;
-
-	private Object lock = new Object();
-
-	public AsyncCollectorBuffer<Integer, Integer> getBuffer(int bufferSize, AsyncDataStream.OutputMode mode) throws Exception {
-		function = new AsyncFunction<Integer, Integer>() {
-			@Override
-			public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception {
-
-			}
-		};
-
-		operator = new AsyncWaitOperator<>(function, bufferSize, mode);
-
-		StreamConfig cfg = new StreamConfig(new Configuration());
-		cfg.setTypeSerializerIn1(IntSerializer.INSTANCE);
-
-		StreamTask<?, ?> mockTask = mock(StreamTask.class);
-
-		when(mockTask.getCheckpointLock()).thenReturn(lock);
-
-		Environment env = new DummyEnvironment("DUMMY;-D", 1, 0);
-		when(mockTask.getEnvironment()).thenReturn(env);
-
-		output = new FakedOutput();
-
-		operator.setup(mockTask, cfg, output);
-
-		buffer = operator.getBuffer();
-
-		return buffer;
-	}
-
-	@Test
-	public void testAdd() throws Exception {
-		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(0l));
-			buffer.addLatencyMarker(new LatencyMarker(111L, 1, 1));
-		}
-
-		Assert.assertEquals(2, buffer.getQueue().size());
-
-		Iterator<StreamElementEntry<Integer>> iterator = buffer.getQueue().iterator();
-		Watermark watermark = iterator.next().getStreamElement().asWatermark();
-		Assert.assertEquals(0l, watermark.getTimestamp());
-
-		LatencyMarker latencyMarker = iterator.next().getStreamElement().asLatencyMarker();
-		Assert.assertEquals(111l, latencyMarker.getMarkedTime());
-
-		buffer.setExtraStreamElement(new Watermark(222l));
-
-		Iterator<StreamElement> elementIterator = buffer.getStreamElementsInBuffer();
-		Assert.assertEquals(0l, elementIterator.next().asWatermark().getTimestamp());
-		Assert.assertEquals(111l, elementIterator.next().asLatencyMarker().getMarkedTime());
-		Assert.assertEquals(222l, elementIterator.next().asWatermark().getTimestamp());
-		Assert.assertFalse(elementIterator.hasNext());
-	}
-
-	private void work(final boolean throwExcept) throws Exception {
-		final int ASYNC_COLLECTOR_NUM = 7;
-
-		Iterator<StreamElement> iterator = new Iterator<StreamElement>() {
-			private int idx = 0;
-
-			@Override
-			public boolean hasNext() {
-				return idx < ASYNC_COLLECTOR_NUM;
-			}
-
-			@Override
-			public StreamElement next() {
-				++idx;
-
-				if (idx == 4) {
-					return new Watermark(333l);
-				}
-				else if (idx == 7) {
-					return new LatencyMarker(111L, 0, 0);
-				}
-				else {
-					StreamRecord<Integer> ret = new StreamRecord<>(idx);
-					ret.setTimestamp(idx * idx);
-
-					return ret;
-				}
-			}
-
-			@Override
-			public void remove() {
-				// do nothing
-			}
-		};
-
-		while (iterator.hasNext()) {
-			final StreamElement record = iterator.next();
-
-			if (record.isRecord()) {
-				AsyncCollector tmp;
-
-				synchronized (lock) {
-					tmp = buffer.addStreamRecord(record.<Integer>asRecord());
-				}
-
-				final AsyncCollector collector = tmp;
-
-				EXECUTOR_SERVICE.submit(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							Thread.sleep(RANDOM.nextInt(100));
-
-							if (throwExcept) {
-								collector.collect(new Exception("wahahahaha..."));
-							}
-							else {
-								collector.collect(Collections.singletonList(record.asRecord().getValue()));
-							}
-						} catch (InterruptedException e) {
-							// do nothing
-						}
-					}
-				});
-			}
-			else if (record.isWatermark()) {
-				synchronized (lock) {
-					buffer.addWatermark(record.asWatermark());
-				}
-			}
-			else {
-				synchronized (lock) {
-					buffer.addLatencyMarker(record.asLatencyMarker());
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testOrderedBuffer() throws Exception {
-		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
-
-		buffer.startEmitterThread();
-
-		work(false);
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-		}
-
-		buffer.stopEmitterThread();
-
-		Assert.assertEquals("1,2,3,5,6,", ((FakedOutput)output).getValue());
-		Assert.assertEquals("1,4,9,333,25,36,111,", ((FakedOutput)output).getTimestamp());
-	}
-
-	@Test
-	public void testUnorderedBuffer() throws Exception {
-		buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
-
-		buffer.startEmitterThread();
-
-		work(false);
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-		}
-
-		buffer.stopEmitterThread();
-
-		Assert.assertEquals(333L, ((FakedOutput)output).getRawTimestamp().toArray()[3]);
-
-		List<Long> result = ((FakedOutput)output).getRawValue();
-		Collections.sort(result);
-		Assert.assertEquals("[1, 2, 3, 5, 6]", result.toString());
-
-		result = ((FakedOutput)output).getRawTimestamp();
-		Collections.sort(result);
-		Assert.assertEquals("[1, 4, 9, 25, 36, 111, 333]", result.toString());
-	}
-
-	@Test
-	public void testOrderedBufferWithManualTriggering() throws Exception {
-		// test AsyncCollectorBuffer with different combinations of StreamElements in the buffer.
-		// by triggering completion of each AsyncCollector one by one manually, we can verify
-		// the output one by one accurately.
-
-		FakedOutput fakedOutput;
-		AsyncCollector<Integer> collector1, collector2;
-
-		// 1. head element is a Watermark or LatencyMarker
-		buffer = getBuffer(3, AsyncDataStream.OutputMode.ORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		fakedOutput.expect(1);
-
-		buffer.startEmitterThread();
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(1L));
-		}
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-
-		fakedOutput.expect(1);
-
-		synchronized (lock) {
-			buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
-		}
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-
-		// 2. buffer layout: WM -> SR1 -> LM -> SR2, where SR2 finishes first, then SR1.
-		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(1L));
-			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
-			buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
-			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
-		}
-
-		fakedOutput.expect(1);
-
-		buffer.startEmitterThread();
-
-		fakedOutput.waitToFinish();
-
-		// in ORDERED mode, the result of completed SR2 will not be emitted right now.
-		collector2.collect(Collections.singletonList(222));
-
-		Thread.sleep(1000);
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-		fakedOutput.expect(3);
-
-		collector1.collect(Collections.singletonList(111));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("111,222,", fakedOutput.getValue());
-		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-		// 3. buffer layout: WM -> SR1 -> LM -> S2, where SR1 completes first, then SR2.
-		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(1L));
-			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
-			buffer.addLatencyMarker(new LatencyMarker(3L, 0, 0));
-			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 4L));
-		}
-
-		fakedOutput.expect(1);
-
-		buffer.startEmitterThread();
-
-		fakedOutput.waitToFinish();
-
-		fakedOutput.expect(2);
-
-		// in ORDERED mode, the result of completed SR1 will be emitted asap.
-		collector1.collect(Collections.singletonList(111));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("111,", fakedOutput.getValue());
-		Assert.assertEquals("1,2,3,", fakedOutput.getTimestamp());
-
-		fakedOutput.expect(1);
-
-		collector2.collect(Collections.singletonList(222));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("111,222,", fakedOutput.getValue());
-		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-		// 4. buffer layout: SR1 -> SR2 -> WM -> LM, where SR2 finishes first.
-		buffer = getBuffer(5, AsyncDataStream.OutputMode.ORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		synchronized (lock) {
-			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 1L));
-			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 2L));
-			buffer.addWatermark(new Watermark(3L));
-			buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
-		}
-
-		buffer.startEmitterThread();
-
-		// in ORDERED mode, the result of completed SR2 will not be emitted right now.
-		collector2.collect(Collections.singletonList(222));
-
-		Thread.sleep(1000);
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("", fakedOutput.getTimestamp());
-
-		fakedOutput.expect(4);
-
-		collector1.collect(Collections.singletonList(111));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("111,222,", fakedOutput.getValue());
-		Assert.assertEquals("1,2,3,4,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-	}
-
-	@Test
-	public void testUnorderedWithManualTriggering() throws Exception {
-		// verify the output in UNORDERED mode by manual triggering.
-
-		FakedOutput fakedOutput;
-		AsyncCollector<Integer> collector1, collector2, collector3;
-
-		// 1. head element is a Watermark or LatencyMarker
-		buffer = getBuffer(5, AsyncDataStream.OutputMode.UNORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		fakedOutput.expect(1);
-
-		buffer.startEmitterThread();
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(1L));
-		}
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-
-		fakedOutput.expect(1);
-
-		synchronized (lock) {
-			buffer.addLatencyMarker(new LatencyMarker(2L, 0, 0));
-		}
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,2,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-
-		// 2. buffer layout: LM -> SR1 -> SR2 -> WM1 -> SR3 -> WM2, where the order of completion is SR3, SR2, SR1
-		buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		synchronized (lock) {
-			buffer.addLatencyMarker(new LatencyMarker(1L, 0, 0));
-			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
-			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
-			buffer.addWatermark(new Watermark(4L));
-			collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
-			buffer.addWatermark(new Watermark(6L));
-		}
-
-		fakedOutput.expect(1);
-
-		buffer.startEmitterThread();
-
-		fakedOutput.waitToFinish();
-
-		// in UNORDERED mode, the result of completed SR3 will not be emitted right now.
-		collector3.collect(Collections.singletonList(333));
-
-		Thread.sleep(1000);
-
-		Assert.assertEquals("", fakedOutput.getValue());
-		Assert.assertEquals("1,", fakedOutput.getTimestamp());
-
-		fakedOutput.expect(1);
-
-		// SR2 will be emitted
-		collector2.collect(Collections.singletonList(222));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("222,", fakedOutput.getValue());
-		Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
-
-		// SR1 will be emitted first, then WM, and then SR3 and WM2
-		fakedOutput.expect(4);
-		collector1.collect(Collections.singletonList(111));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("222,111,333,", fakedOutput.getValue());
-		Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-		// 3. buffer layout: WM1 -> SR1 -> SR2 -> LM -> SR3 -> WM2, where the order of completion is SR2, SR1, SR3
-		buffer = getBuffer(6, AsyncDataStream.OutputMode.UNORDERED);
-		fakedOutput = (FakedOutput)output;
-
-		synchronized (lock) {
-			buffer.addWatermark(new Watermark(1L));
-			collector1 = buffer.addStreamRecord(new StreamRecord<>(111, 2L));
-			collector2 = buffer.addStreamRecord(new StreamRecord<>(222, 3L));
-			buffer.addLatencyMarker(new LatencyMarker(4L, 0, 0));
-			collector3 = buffer.addStreamRecord(new StreamRecord<>(333, 5L));
-			buffer.addWatermark(new Watermark(6L));
-		}
-
-		// the result of SR2 will be emitted following WM1
-		collector2.collect(Collections.singletonList(222));
-
-		fakedOutput.expect(2);
-
-		buffer.startEmitterThread();
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("222,", fakedOutput.getValue());
-		Assert.assertEquals("1,3,", fakedOutput.getTimestamp());
-
-		// SR1 and LM will be emitted
-		fakedOutput.expect(2);
-		collector1.collect(Collections.singletonList(111));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("222,111,", fakedOutput.getValue());
-		Assert.assertEquals("1,3,2,4,", fakedOutput.getTimestamp());
-
-		// SR3 and WM2 will be emitted
-		fakedOutput.expect(2);
-		collector3.collect(Collections.singletonList(333));
-
-		fakedOutput.waitToFinish();
-
-		Assert.assertEquals("222,111,333,", fakedOutput.getValue());
-		Assert.assertEquals("1,3,2,4,5,6,", fakedOutput.getTimestamp());
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-			buffer.stopEmitterThread();
-		}
-
-	}
-
-
-
-	@Test
-	public void testBufferWithException() throws Exception {
-		buffer = getBuffer(3, AsyncDataStream.OutputMode.UNORDERED);
-
-		buffer.startEmitterThread();
-
-		IOException expected = null;
-		try {
-			work(true);
-		}
-		catch (IOException e) {
-			expected = e;
-		}
-
-		Assert.assertNotNull(expected);
-		Assert.assertEquals(expected.getMessage(), "wahahahaha...");
-
-		synchronized (lock) {
-			buffer.waitEmpty();
-		}
-
-		buffer.stopEmitterThread();
-	}
-
-	public class FakedOutput implements Output<StreamRecord<Integer>> {
-		private List<Long> outputs;
-		private List<Long> timestamps;
-
-		private CountDownLatch latch;
-
-		public FakedOutput() {
-			this.outputs = new ArrayList<>();
-			this.timestamps = new ArrayList<>();
-		}
-
-		@Override
-		public void collect(StreamRecord<Integer> record) {
-			outputs.add(record.getValue().longValue());
-			if (record.hasTimestamp()) {
-				timestamps.add(record.getTimestamp());
-			}
-
-			if (latch != null) {
-				latch.countDown();
-			}
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			timestamps.add(mark.getTimestamp());
-
-			if (latch != null) {
-				latch.countDown();
-			}
-		}
-
-		@Override
-		public void emitLatencyMarker(LatencyMarker latencyMarker) {
-			timestamps.add(latencyMarker.getMarkedTime());
-
-			if (latch != null) {
-				latch.countDown();
-			}
-		}
-
-		@Override
-		public void close() {
-		}
-
-		public String getValue() {
-			StringBuilder sb = new StringBuilder();
-			for (Long i : outputs) {
-				sb.append(i).append(",");
-			}
-			return sb.toString();
-		}
-
-		public String getTimestamp() {
-			StringBuilder sb = new StringBuilder();
-			for (Long i : timestamps) {
-				sb.append(i).append(",");
-			}
-			return sb.toString();
-		}
-
-		public List<Long> getRawValue() {
-			return outputs;
-		}
-
-		public List<Long> getRawTimestamp() {
-			return timestamps;
-		}
-
-		public void expect(int count) {
-			latch = new CountDownLatch(count);
-		}
-
-		public void waitToFinish() throws InterruptedException {
-			latch.await();
-		}
-	}
-}