You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:39 UTC

[3/7] flink git commit: [FLINK-4391] Polish asynchronous I/O operations

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 560ee5a..10ee14f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.async;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -36,8 +37,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -45,17 +48,15 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -74,19 +75,16 @@ import static org.junit.Assert.assertEquals;
  *     <li>Snapshot state and restore state</li>
  * </ul>
  */
-public class AsyncWaitOperatorTest {
-
-	// hold sink result
-	private static Queue<Object> sinkResult;
+public class AsyncWaitOperatorTest extends TestLogger {
 
 	private static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
-		final int SLEEP_FACTOR = 100;
-		final int THREAD_POOL_SIZE = 10;
+		private static final long serialVersionUID = 8522411971886428444L;
 
-		transient static ExecutorService executorService;
-		static int counter = 0;
+		private static final long TIMEOUT = 5000L;
+		private static final int THREAD_POOL_SIZE = 10;
 
-		static Random random = new Random();
+		static ExecutorService executorService;
+		static int counter = 0;
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -105,33 +103,35 @@ public class AsyncWaitOperatorTest {
 		public void close() throws Exception {
 			super.close();
 
+			freeExecutor();
+		}
+
+		private void freeExecutor() {
 			synchronized (MyAsyncFunction.class) {
 				--counter;
 
 				if (counter == 0) {
 					executorService.shutdown();
-					executorService.awaitTermination(SLEEP_FACTOR * THREAD_POOL_SIZE, TimeUnit.MILLISECONDS);
+
+					try {
+						if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
+							executorService.shutdownNow();
+						}
+					} catch (InterruptedException interrupted) {
+						executorService.shutdownNow();
+
+						Thread.currentThread().interrupt();
+					}
 				}
 			}
 		}
 
 		@Override
 		public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
-			this.executorService.submit(new Runnable() {
+			executorService.submit(new Runnable() {
 				@Override
 				public void run() {
-					// wait for while to simulate async operation here
-					int sleep = (int) (random.nextFloat() * SLEEP_FACTOR);
-
-					try {
-						Thread.sleep(sleep);
-						List<Integer> ret = new ArrayList<>();
-						ret.add(input*2);
-						collector.collect(ret);
-					}
-					catch (InterruptedException e) {
-						// do nothing
-					}
+					collector.collect(Collections.singletonList(input * 2));
 				}
 			});
 		}
@@ -141,11 +141,13 @@ public class AsyncWaitOperatorTest {
 	 * A special {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} without issuing
 	 * {@link AsyncCollector#collect} until the latch counts to zero.
 	 * This function is used in the testStateSnapshotAndRestore, ensuring
-	 * that {@link org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry} can stay
-	 * in the {@link org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer} to be
+	 * that {@link StreamElementQueueEntry} can stay
+	 * in the {@link StreamElementQueue} to be
 	 * snapshotted while checkpointing.
 	 */
 	private static class LazyAsyncFunction extends MyAsyncFunction {
+		private static final long serialVersionUID = 3537791752703154670L;
+
 		private static CountDownLatch latch;
 
 		public LazyAsyncFunction() {
@@ -200,17 +202,23 @@ public class AsyncWaitOperatorTest {
 		}
 	}
 
+	/**
+	 * Test the AsyncWaitOperator with ordered mode and event time.
+	 */
 	@Test
-	public void testWaterMarkOrdered() throws Exception {
-		testWithWatermark(AsyncDataStream.OutputMode.ORDERED);
+	public void testEventTimeOrdered() throws Exception {
+		testEventTime(AsyncDataStream.OutputMode.ORDERED);
 	}
 
+	/**
+	 * Test the AsyncWaitOperator with unordered mode and event time.
+	 */
 	@Test
 	public void testWaterMarkUnordered() throws Exception {
-		testWithWatermark(AsyncDataStream.OutputMode.UNORDERED);
+		testEventTime(AsyncDataStream.OutputMode.UNORDERED);
 	}
 
-	private void testWithWatermark(AsyncDataStream.OutputMode mode) throws Exception {
+	private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
 		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode);
 
 		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
@@ -255,40 +263,42 @@ public class AsyncWaitOperatorTest {
 		}
 	}
 
+	/**
+	 * Test the AsyncWaitOperator with ordered mode and processing time.
+	 */
 	@Test
-	public void testOrdered() throws Exception {
-		testRun(AsyncDataStream.OutputMode.ORDERED);
+	public void testProcessingTimeOrdered() throws Exception {
+		testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
 	}
 
+	/**
+	 * Test the AsyncWaitOperator with unordered mode and processing time.
+	 */
 	@Test
-	public void testUnordered() throws Exception {
-		testRun(AsyncDataStream.OutputMode.UNORDERED);
+	public void testProcessingUnordered() throws Exception {
+		testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
 	}
 
-	private void testRun(AsyncDataStream.OutputMode mode) throws Exception {
-		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
-		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
-				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
+	private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
 		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode);
 
-		final StreamConfig streamConfig = testHarness.getStreamConfig();
-		streamConfig.setStreamOperator(operator);
-
-		testHarness.invoke();
-		testHarness.waitForTaskRunning();
+		final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
 
 		final long initialTime = 0L;
-		final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+		final Queue<Object> expectedOutput = new ArrayDeque<>();
 
-		testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
-		testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
-		testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
-		testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
-		testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
-		testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
-		testHarness.processElement(new StreamRecord<>(7, initialTime + 7));
-		testHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+		testHarness.open();
+
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+			testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+			testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+			testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
+			testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
+			testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
+			testHarness.processElement(new StreamRecord<>(7, initialTime + 7));
+			testHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+		}
 
 		expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
 		expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
@@ -299,11 +309,9 @@ public class AsyncWaitOperatorTest {
 		expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
 		expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
 
-		testHarness.waitForInputProcessing();
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.close();
+		}
 
 		if (mode == AsyncDataStream.OutputMode.ORDERED) {
 			TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -317,6 +325,54 @@ public class AsyncWaitOperatorTest {
 		}
 	}
 
+	/**
+	 *	Tests that the AsyncWaitOperator works together with chaining
+	 */
+	@Test
+	public void testOperatorChainWithProcessingTime() throws Exception {
+
+		JobVertex chainedVertex = createChainedVertex(false);
+
+		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.taskConfig = chainedVertex.getConfiguration();
+
+		final StreamConfig streamConfig = testHarness.getStreamConfig();
+		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
+		final AsyncWaitOperator<Integer, Integer> headOperator =
+				operatorChainStreamConfig.getStreamOperator(AsyncWaitOperatorTest.class.getClassLoader());
+		streamConfig.setStreamOperator(headOperator);
+
+		testHarness.invoke();
+		testHarness.waitForTaskRunning();
+
+		long initialTimestamp = 0L;
+
+		testHarness.processElement(new StreamRecord<>(5, initialTimestamp));
+		testHarness.processElement(new StreamRecord<>(6, initialTimestamp + 1L));
+		testHarness.processElement(new StreamRecord<>(7, initialTimestamp + 2L));
+		testHarness.processElement(new StreamRecord<>(8, initialTimestamp + 3L));
+		testHarness.processElement(new StreamRecord<>(9, initialTimestamp + 4L));
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+		expectedOutput.add(new StreamRecord<>(22, initialTimestamp));
+		expectedOutput.add(new StreamRecord<>(26, initialTimestamp + 1L));
+		expectedOutput.add(new StreamRecord<>(30, initialTimestamp + 2L));
+		expectedOutput.add(new StreamRecord<>(34, initialTimestamp + 3L));
+		expectedOutput.add(new StreamRecord<>(38, initialTimestamp + 4L));
+
+		TestHarnessUtil.assertOutputEqualsSorted(
+				"Test for chained operator with AsyncWaitOperator failed",
+				expectedOutput,
+				testHarness.getOutput(),
+				new StreamRecordComparator());
+	}
+
 	private JobVertex createChainedVertex(boolean withLazyFunction) {
 		StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -353,144 +409,23 @@ public class AsyncWaitOperatorTest {
 
 		input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3);
 
-		input.addSink(new SinkFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
+		input.map(new MapFunction<Integer, Integer>() {
+			private static final long serialVersionUID = 5162085254238405527L;
 
 			@Override
-			public void invoke(Integer value) throws Exception {
-				sinkResult.add(value);
+			public Integer map(Integer value) throws Exception {
+				return value;
 			}
-		});
+		}).startNewChain().addSink(new DiscardingSink<Integer>());
 
 		// be build our own OperatorChain
 		final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
 
-		Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2);
+		Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 3);
 
 		return jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
 	}
 
-	/**
-	 * Get the {@link SubtaskState} for the operator chain. The state will keep several inputs.
-	 *
-	 * @return A {@link SubtaskState}
-	 * @throws Exception
-     */
-	private SubtaskState createTaskState() throws Exception {
-		sinkResult = new ConcurrentLinkedDeque<>();
-
-		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
-		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
-				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
-		JobVertex chainedVertex = createChainedVertex(true);
-
-		testHarness.taskConfig = chainedVertex.getConfiguration();
-
-		final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
-				testHarness.jobConfig,
-				testHarness.taskConfig,
-				testHarness.getExecutionConfig(),
-				testHarness.memorySize,
-				new MockInputSplitProvider(),
-				testHarness.bufferSize);
-
-		final StreamConfig streamConfig = testHarness.getStreamConfig();
-		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
-		final AsyncWaitOperator<Integer, Integer> headOperator =
-				operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
-		streamConfig.setStreamOperator(headOperator);
-
-		testHarness.invoke(env);
-		testHarness.waitForTaskRunning();
-
-		testHarness.processElement(new StreamRecord<>(1));
-		testHarness.processElement(new StreamRecord<>(2));
-		testHarness.processElement(new StreamRecord<>(3));
-		testHarness.processElement(new StreamRecord<>(4));
-
-		testHarness.waitForInputProcessing();
-
-		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
-
-		task.triggerCheckpoint(checkpointMetaData);
-
-		env.getCheckpointLatch().await();
-
-		assertEquals(1L, env.getCheckpointId());
-
-		LazyAsyncFunction.countDown();
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-
-		return env.getCheckpointStateHandles();
-	}
-
-	@Test
-	public void testOperatorChain() throws Exception {
-
-		JobVertex chainedVertex = createChainedVertex(false);
-
-		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
-		final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
-				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
-		task.setInitialState(new TaskStateHandles(createTaskState()));
-
-		sinkResult = new ConcurrentLinkedDeque<>();
-
-		testHarness.taskConfig = chainedVertex.getConfiguration();
-
-		final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
-				testHarness.jobConfig,
-				testHarness.taskConfig,
-				testHarness.getExecutionConfig(),
-				testHarness.memorySize,
-				new MockInputSplitProvider(),
-				testHarness.bufferSize);
-
-		final StreamConfig streamConfig = testHarness.getStreamConfig();
-		final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
-		final AsyncWaitOperator<Integer, Integer> headOperator =
-				operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
-		streamConfig.setStreamOperator(headOperator);
-
-		testHarness.invoke(env);
-		testHarness.waitForTaskRunning();
-
-		testHarness.processElement(new StreamRecord<>(5));
-		testHarness.processElement(new StreamRecord<>(6));
-		testHarness.processElement(new StreamRecord<>(7));
-		testHarness.processElement(new StreamRecord<>(8));
-		testHarness.processElement(new StreamRecord<>(9));
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(6);
-		expectedOutput.add(10);
-		expectedOutput.add(14);
-		expectedOutput.add(18);
-		expectedOutput.add(22);
-		expectedOutput.add(26);
-		expectedOutput.add(30);
-		expectedOutput.add(34);
-		expectedOutput.add(38);
-
-		TestHarnessUtil.assertOutputEqualsSorted(
-				"Test for chained operator with AsyncWaitOperator failed",
-				expectedOutput,
-				sinkResult,
-				new Comparator<Object>() {
-					@Override
-					public int compare(Object o1, Object o2) {
-						return (Integer)o1 - (Integer)o2;
-					}
-				});
-	}
-
 	@Test
 	public void testStateSnapshotAndRestore() throws Exception {
 		final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
@@ -498,7 +433,7 @@ public class AsyncWaitOperatorTest {
 				new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
 
 		AsyncWaitOperator<Integer, Integer> operator =
-				new AsyncWaitOperator<>(new LazyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+				new AsyncWaitOperator<>(new LazyAsyncFunction(), 3, AsyncDataStream.OutputMode.ORDERED);
 
 		final StreamConfig streamConfig = testHarness.getStreamConfig();
 		streamConfig.setStreamOperator(operator);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
new file mode 100644
index 0000000..c3a47aa
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class EmitterTest extends TestLogger {
+
+	private static final long timeout = 10000L;
+	private static ExecutorService executor;
+
+	@BeforeClass
+	public static void setup() {
+		executor = Executors.newFixedThreadPool(3);
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		executor.shutdown();
+
+		try {
+			if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+				executor.shutdownNow();
+			}
+		} catch (InterruptedException interrupted) {
+			executor.shutdownNow();
+
+			Thread.currentThread().interrupt();
+		}
+	}
+
+	/**
+	 * Tests that the emitter outputs completed stream element queue entries.
+	 */
+	@Test
+	public void testEmitterWithOrderedQueue() throws Exception {
+		Object lock = new Object();
+		List<StreamElement> list = new ArrayList<>();
+		Output<StreamRecord<Integer>> output = new CollectorOutput<>(list);
+
+		List<StreamElement> expected = Arrays.asList(
+			new StreamRecord<>(1, 0L),
+			new StreamRecord<>(2, 0L),
+			new StreamRecord<>(3, 1L),
+			new StreamRecord<>(4, 1L),
+			new Watermark(3L),
+			new StreamRecord<>(5, 4L),
+			new StreamRecord<>(6, 4L));
+
+		OperatorActions operatorActions = mock(OperatorActions.class);
+
+		final int capacity = 5;
+
+		StreamElementQueue queue = new OrderedStreamElementQueue(capacity, executor, operatorActions);
+
+		final Emitter<Integer> emitter = new Emitter<>(lock, output, queue, operatorActions);
+
+		final Thread emitterThread = new Thread(emitter);
+		emitterThread.start();
+
+		try {
+			StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+			StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+			WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
+			StreamRecordQueueEntry<Integer> record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 4L));
+
+			queue.put(record1);
+			queue.put(record2);
+			queue.put(watermark1);
+			queue.put(record3);
+
+			record2.collect(Arrays.asList(3, 4));
+			record1.collect(Arrays.asList(1, 2));
+			record3.collect(Arrays.asList(5, 6));
+
+			synchronized (lock) {
+				while (!queue.isEmpty()) {
+					lock.wait();
+				}
+			}
+
+			Assert.assertEquals(expected, list);
+		} finally {
+			emitter.stop();
+			emitterThread.interrupt();
+		}
+	}
+
+	/**
+	 * Tests that the emitter handles exceptions occurring in the {@link AsyncCollector} correctly.
+	 */
+	@Test
+	public void testEmitterWithExceptions() throws Exception {
+		Object lock = new Object();
+		List<StreamElement> list = new ArrayList<>();
+		Output<StreamRecord<Integer>> output = new CollectorOutput<>(list);
+
+		List<StreamElement> expected = Arrays.asList(
+			new StreamRecord<>(1, 0L),
+			new Watermark(3L));
+
+		OperatorActions operatorActions = mock(OperatorActions.class);
+
+		final int capacity = 3;
+
+		StreamElementQueue queue = new OrderedStreamElementQueue(capacity, executor, operatorActions);
+
+		final Emitter<Integer> emitter = new Emitter<>(lock, output, queue, operatorActions);
+
+		final Thread emitterThread = new Thread(emitter);
+		emitterThread.start();
+
+		final Exception testException = new Exception("Test exception");
+
+		try {
+			StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+			StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+			WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
+
+			queue.put(record1);
+			queue.put(record2);
+			queue.put(watermark1);
+
+			record2.collect(testException);
+			record1.collect(Arrays.asList(1));
+
+			synchronized (lock) {
+				while (!queue.isEmpty()) {
+					lock.wait();
+				}
+			}
+
+			Assert.assertEquals(expected, list);
+
+			ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+			verify(operatorActions).failOperator(argumentCaptor.capture());
+
+			Throwable failureCause = argumentCaptor.getValue();
+
+			Assert.assertNotNull(failureCause.getCause());
+			Assert.assertTrue(failureCause.getCause() instanceof ExecutionException);
+
+			Assert.assertNotNull(failureCause.getCause().getCause());
+			Assert.assertEquals(testException, failureCause.getCause().getCause());
+		} finally {
+			emitter.stop();
+			emitterThread.interrupt();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
new file mode 100644
index 0000000..0380512
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * {@link OrderedStreamElementQueue} specific tests
+ */
+public class OrderedStreamElementQueueTest extends TestLogger {
+
+	private static final long timeout = 10000L;
+	private static ExecutorService executor;
+
+	@BeforeClass
+	public static void setup() {
+		executor = Executors.newFixedThreadPool(3);
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		executor.shutdown();
+
+		try {
+			if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+				executor.shutdownNow();
+			}
+		} catch (InterruptedException interrupted) {
+			executor.shutdownNow();
+
+			Thread.currentThread().interrupt();
+		}
+	}
+
+	/**
+	 * Tests that only the head element is pulled from the ordered queue if it has been
+	 * completed.
+	 */
+	@Test
+	public void testCompletionOrder() throws Exception {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+		final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions);
+
+		StreamRecordQueueEntry<Integer> entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+		StreamRecordQueueEntry<Integer> entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+		WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L));
+		StreamRecordQueueEntry<Integer> entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
+
+		List<StreamElementQueueEntry<?>> expected = Arrays.asList(entry1, entry2, entry3, entry4);
+
+		for (StreamElementQueueEntry<?> entry : expected) {
+			queue.put(entry);
+		}
+
+		Future<List<AsyncResult>> pollOperation = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>() {
+			@Override
+			public List<AsyncResult> call() throws Exception {
+				List<AsyncResult> result = new ArrayList<>(4);
+				while (!queue.isEmpty()) {
+					result.add(queue.poll());
+				}
+
+				return result;
+			}
+		}, executor);
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(pollOperation.isDone());
+
+		entry2.collect(Collections.<Integer>emptyList());
+
+		entry4.collect(Collections.<Integer>emptyList());
+
+		Thread.sleep(10L);
+
+		Assert.assertEquals(4, queue.size());
+
+		entry1.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(expected, pollOperation.get());
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
new file mode 100644
index 0000000..c9e59c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType;
+import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the basic functionality of {@link StreamElementQueue}. The basic operations consist
+ * of putting and polling elements from the queue.
+ */
+@RunWith(Parameterized.class)
+public class StreamElementQueueTest extends TestLogger {
+
+	private static final long timeout = 10000L;
+	private static ExecutorService executor;
+
+	@BeforeClass
+	public static void setup() {
+		executor = Executors.newFixedThreadPool(3);
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		executor.shutdown();
+
+		try {
+			if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+				executor.shutdownNow();
+			}
+		} catch (InterruptedException interrupted) {
+			executor.shutdownNow();
+
+			Thread.currentThread().interrupt();
+		}
+	}
+
+	enum StreamElementQueueType {
+		OrderedStreamElementQueueType,
+		UnorderedStreamElementQueueType
+	}
+
+	@Parameterized.Parameters
+	public static Collection<StreamElementQueueType> streamElementQueueTypes() {
+		return Arrays.asList(OrderedStreamElementQueueType, UnorderedStreamElementQueueType);
+	}
+
+	private final StreamElementQueueType streamElementQueueType;
+
+	public StreamElementQueueTest(StreamElementQueueType streamElementQueueType) {
+		this.streamElementQueueType = Preconditions.checkNotNull(streamElementQueueType);
+	}
+
+	public StreamElementQueue createStreamElementQueue(int capacity, OperatorActions operatorActions) {
+		switch (streamElementQueueType) {
+			case OrderedStreamElementQueueType:
+				return new OrderedStreamElementQueue(capacity, executor, operatorActions);
+			case UnorderedStreamElementQueueType:
+				return new UnorderedStreamElementQueue(capacity, executor, operatorActions);
+			default:
+				throw new IllegalStateException("Unknown stream element queue type: " + streamElementQueueType);
+		}
+	}
+
+	@Test
+	public void testPut() throws InterruptedException {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+		StreamElementQueue queue = createStreamElementQueue(2, operatorActions);
+
+		final Watermark watermark = new Watermark(0L);
+		final StreamRecord<Integer> streamRecord = new StreamRecord<>(42, 1L);
+		final Watermark nextWatermark = new Watermark(2L);
+
+		final WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(watermark);
+		final StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(streamRecord);
+
+		queue.put(watermarkQueueEntry);
+		queue.put(streamRecordQueueEntry);
+
+		Assert.assertEquals(2, queue.size());
+
+		Assert.assertFalse(queue.tryPut(new WatermarkQueueEntry(nextWatermark)));
+
+		Collection<StreamElementQueueEntry<?>> actualValues = queue.values();
+
+		List<StreamElementQueueEntry<?>> expectedValues = Arrays.asList(watermarkQueueEntry, streamRecordQueueEntry);
+
+		Assert.assertEquals(expectedValues, actualValues);
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+
+	@Test
+	public void testPoll() throws InterruptedException {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+		StreamElementQueue queue = createStreamElementQueue(2, operatorActions);
+
+		WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(0L));
+		StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(42, 1L));
+
+		queue.put(watermarkQueueEntry);
+		queue.put(streamRecordQueueEntry);
+
+		Assert.assertEquals(watermarkQueueEntry, queue.peekBlockingly());
+		Assert.assertEquals(2, queue.size());
+
+		Assert.assertEquals(watermarkQueueEntry, queue.poll());
+		Assert.assertEquals(1, queue.size());
+
+		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(streamRecordQueueEntry, queue.poll());
+
+		Assert.assertEquals(0, queue.size());
+		Assert.assertTrue(queue.isEmpty());
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+
+	/**
+	 * Tests that a put operation blocks if the queue is full.
+	 */
+	@Test
+	public void testBlockingPut() throws Exception {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+		final StreamElementQueue queue = createStreamElementQueue(1, operatorActions);
+
+		StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(42, 0L));
+		final StreamRecordQueueEntry<Integer> streamRecordQueueEntry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(43, 1L));
+
+		queue.put(streamRecordQueueEntry);
+
+		Assert.assertEquals(1, queue.size());
+
+		Future<Void> putOperation = FlinkFuture.supplyAsync(new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				queue.put(streamRecordQueueEntry2);
+
+				return null;
+			}
+		}, executor);
+
+		// give the future a chance to complete
+		Thread.sleep(10L);
+
+		// but it shouldn't ;-)
+		Assert.assertFalse(putOperation.isDone());
+
+		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+		// polling the completed head element frees the queue again
+		Assert.assertEquals(streamRecordQueueEntry, queue.poll());
+
+		// now the put operation should complete
+		putOperation.get();
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+
+	/**
+	 * Test that a poll operation on an empty queue blocks.
+	 */
+	@Test
+	public void testBlockingPoll() throws Exception {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+		final StreamElementQueue queue = createStreamElementQueue(1, operatorActions);
+
+		WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(1L));
+		StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 2L));
+
+		Assert.assertTrue(queue.isEmpty());
+
+		Future<AsyncResult> peekOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+			@Override
+			public AsyncResult call() throws Exception {
+				return queue.peekBlockingly();
+			}
+		}, executor);
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(peekOperation.isDone());
+
+		queue.put(watermarkQueueEntry);
+
+		AsyncResult watermarkResult = peekOperation.get();
+
+		Assert.assertEquals(watermarkQueueEntry, watermarkResult);
+		Assert.assertEquals(1, queue.size());
+
+		Assert.assertEquals(watermarkQueueEntry, queue.poll());
+		Assert.assertTrue(queue.isEmpty());
+
+		Future<AsyncResult> pollOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+			@Override
+			public AsyncResult call() throws Exception {
+				return queue.poll();
+			}
+		}, executor);
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(pollOperation.isDone());
+
+		queue.put(streamRecordQueueEntry);
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(pollOperation.isDone());
+
+		streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(streamRecordQueueEntry, pollOperation.get());
+
+		Assert.assertTrue(queue.isEmpty());
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
new file mode 100644
index 0000000..0a57f92
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * {@link UnorderedStreamElementQueue} specific tests
+ */
+public class UnorderedStreamElementQueueTest extends TestLogger {
+	private static final long timeout = 10000L;
+	private static ExecutorService executor;
+
+	@BeforeClass
+	public static void setup() {
+		executor = Executors.newFixedThreadPool(3);
+	}
+
+	@AfterClass
+	public static void shutdown() {
+		executor.shutdown();
+
+		try {
+			if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+				executor.shutdownNow();
+			}
+		} catch (InterruptedException interrupted) {
+			executor.shutdownNow();
+
+			Thread.currentThread().interrupt();
+		}
+	}
+
+	/**
+	 * Tests that only elements before the oldest watermark are returned if they are completed.
+	 */
+	@Test
+	public void testCompletionOrder() throws Exception {
+		OperatorActions operatorActions = mock(OperatorActions.class);
+
+		final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue(8, executor, operatorActions);
+
+		StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+		StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+		WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(2L));
+		StreamRecordQueueEntry<Integer> record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
+		StreamRecordQueueEntry<Integer> record4 = new StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
+		WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new Watermark(5L));
+		StreamRecordQueueEntry<Integer> record5 = new StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
+		StreamRecordQueueEntry<Integer> record6 = new StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
+
+		List<StreamElementQueueEntry<?>> entries = Arrays.asList(record1, record2, watermark1, record3,
+			record4, watermark2, record5, record6);
+
+		// The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
+		for (StreamElementQueueEntry<?> entry : entries) {
+			queue.put(entry);
+		}
+
+		Assert.assertTrue(8 == queue.size());
+
+		Future<AsyncResult> firstPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+			@Override
+			public AsyncResult call() throws Exception {
+				return queue.poll();
+			}
+		}, executor);
+
+		// this should not fulfill the poll, because R3 is behind W1
+		record3.collect(Collections.<Integer>emptyList());
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(firstPoll.isDone());
+
+		record2.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(record2, firstPoll.get());
+
+		Future<AsyncResult> secondPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+			@Override
+			public AsyncResult call() throws Exception {
+				return queue.poll();
+			}
+		}, executor);
+
+		record6.collect(Collections.<Integer>emptyList());
+		record4.collect(Collections.<Integer>emptyList());
+
+		Thread.sleep(10L);
+
+		// The future should not be completed because R1 has not been completed yet
+		Assert.assertFalse(secondPoll.isDone());
+
+		record1.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(record1, secondPoll.get());
+
+		// Now W1, R3, R4 and W2 are completed and should be pollable
+		Assert.assertEquals(watermark1, queue.poll());
+
+		// The order of R3 and R4 is not specified
+		Set<AsyncResult> expected = new HashSet<>(2);
+		expected.add(record3);
+		expected.add(record4);
+
+		Set<AsyncResult> actual = new HashSet<>(2);
+
+		actual.add(queue.poll());
+		actual.add(queue.poll());
+
+		Assert.assertEquals(expected, actual);
+
+		Assert.assertEquals(watermark2, queue.poll());
+
+		// since R6 has been completed before and W2 has been consumed, we should be able to poll
+		// this record as well
+		Assert.assertEquals(record6, queue.poll());
+
+		// only R5 left in the queue
+		Assert.assertTrue(1 == queue.size());
+
+		Future<AsyncResult> thirdPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+			@Override
+			public AsyncResult call() throws Exception {
+				return queue.poll();
+			}
+		}, executor);
+
+		Thread.sleep(10L);
+
+		Assert.assertFalse(thirdPoll.isDone());
+
+		record5.collect(Collections.<Integer>emptyList());
+
+		Assert.assertEquals(record5, thirdPoll.get());
+
+		Assert.assertTrue(queue.isEmpty());
+
+		verify(operatorActions, never()).failOperator(any(Exception.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index f87b5ef..e600420 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -340,33 +341,4 @@ public class StreamSourceOperatorTest {
 			running = false;
 		}
 	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class CollectorOutput<T> implements Output<StreamRecord<T>> {
-
-		private final List<StreamElement> list;
-
-		private CollectorOutput(List<StreamElement> list) {
-			this.list = list;
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			list.add(mark);
-		}
-
-		@Override
-		public void emitLatencyMarker(LatencyMarker latencyMarker) {
-			list.add(latencyMarker);
-		}
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			list.add(record);
-		}
-
-		@Override
-		public void close() {}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
new file mode 100644
index 0000000..fcc8a6c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CollectorOutput<T> implements Output<StreamRecord<T>> {
+
+	private final List<StreamElement> list;
+
+	public CollectorOutput(List<StreamElement> list) {
+		this.list = list;
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		list.add(mark);
+	}
+
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		list.add(latencyMarker);
+	}
+
+	@Override
+	public void collect(StreamRecord<T> record) {
+		T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue()));
+		list.add(record.copy(copied));
+	}
+
+	@Override
+	public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index ea99fe3..3631965 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.MathUtils;
 import org.junit.*;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -94,6 +95,9 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
 
+		final List<Integer> actualResult1 = new ArrayList<>();
+		MemorySinkFunction.registerCollection(0, actualResult1);
+
 		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
 			private static final long serialVersionUID = 2114608668010092995L;
 
@@ -103,9 +107,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			}
 		}).addSink(sinkFunction1);
 
-
 		final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
 
+		final List<Integer> actualResult2 = new ArrayList<>();
+		MemorySinkFunction.registerCollection(1, actualResult2);
+
 		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
 			private static final long serialVersionUID = 5631104389744681308L;
 
@@ -132,13 +138,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		env.execute();
 
-		Collection<Integer> result1 = sinkFunction1.getResult();
-		Collections.sort((ArrayList)result1);
-		Collection<Integer> result2 = sinkFunction2.getResult();
-		Collections.sort((ArrayList)result2);
+		Collections.sort(actualResult1);
+		Collections.sort(actualResult2);
 
-		Assert.assertArrayEquals(result1.toArray(), expected1.toArray());
-		Assert.assertArrayEquals(result2.toArray(), expected2.toArray());
+		Assert.assertEquals(expected1, actualResult1);
+		Assert.assertEquals(expected2, actualResult2);
 
 		MemorySinkFunction.clear();
 	}
@@ -155,6 +159,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
 
 		final MemorySinkFunction sinkFunction = new MemorySinkFunction(0);
+		final ArrayList<Integer> actualResult = new ArrayList<>();
+		MemorySinkFunction.registerCollection(0, actualResult);
 
 		input
 			.keyBy(0)
@@ -186,23 +192,28 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		env.execute();
 
-		Collection<Integer> result = sinkFunction.getResult();
-		Collections.sort((ArrayList)result);
+		Collections.sort(actualResult);
 
-		Assert.assertArrayEquals(result.toArray(), expected.toArray());
+		Assert.assertEquals(expected, actualResult);
 
 		MemorySinkFunction.clear();
 	}
 
+	/**
+	 * Tests the basic functionality of the AsyncWaitOperator: Processing a limited stream of
+	 * elements by doubling their value. This is tested in for the ordered and unordered mode.
+	 */
 	@Test
 	public void testAsyncWaitOperator() throws Exception {
-		final int numElements = 10;
+		final int numElements = 5;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
 
 		AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+			private static final long serialVersionUID = 7000343199829487985L;
+
 			transient ExecutorService executorService;
 
 			@Override
@@ -214,26 +225,16 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			@Override
 			public void close() throws Exception {
 				super.close();
-				executorService.shutdown();
+				executorService.shutdownNow();
 			}
 
 			@Override
 			public void asyncInvoke(final Tuple2<Integer, NonSerializable> input,
 									final AsyncCollector<Integer> collector) throws Exception {
-				this.executorService.submit(new Runnable() {
+				executorService.submit(new Runnable() {
 					@Override
 					public void run() {
-						// wait for while to simulate async operation here
-						int sleep = (int) (new Random().nextFloat() * 10);
-						try {
-							Thread.sleep(sleep);
-							List<Integer> ret = new ArrayList<>();
-							ret.add(input.f0+input.f0);
-							collector.collect(ret);
-						}
-						catch (InterruptedException e) {
-							collector.collect(new ArrayList<Integer>(0));
-						}
+						collector.collect(Collections.singletonList(input.f0 + input.f0));
 					}
 				});
 			}
@@ -243,6 +244,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		// save result from ordered process
 		final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
+		final List<Integer> actualResult1 = new ArrayList<>(numElements);
+		MemorySinkFunction.registerCollection(0, actualResult1);
 
 		orderedResult.addSink(sinkFunction1).setParallelism(1);
 
@@ -251,6 +254,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		// save result from unordered process
 		final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
+		final List<Integer> actualResult2 = new ArrayList<>(numElements);
+		MemorySinkFunction.registerCollection(1, actualResult2);
 
 		unorderedResult.addSink(sinkFunction2);
 
@@ -263,11 +268,10 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 		env.execute();
 
-		Assert.assertArrayEquals(expected.toArray(), sinkFunction1.getResult().toArray());
+		Assert.assertEquals(expected, actualResult1);
 
-		Collection<Integer> result = sinkFunction2.getResult();
-		Collections.sort((ArrayList)result);
-		Assert.assertArrayEquals(expected.toArray(), result.toArray());
+		Collections.sort(actualResult2);
+		Assert.assertEquals(expected, actualResult2);
 
 		MemorySinkFunction.clear();
 	}
@@ -331,43 +335,31 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 	}
 
 	private static class MemorySinkFunction implements SinkFunction<Integer> {
-		private final static Collection<Integer> collection1 = new ArrayList<>(10);
+		private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap<>();
 
-		private final static Collection<Integer> collection2 = new ArrayList<>(10);
+		private static final long serialVersionUID = -8815570195074103860L;
 
-		private  final long serialVersionUID = -8815570195074103860L;
+		private final int key;
 
-		private final int idx;
-
-		public MemorySinkFunction(int idx) {
-			this.idx = idx;
+		public MemorySinkFunction(int key) {
+			this.key = key;
 		}
 
 		@Override
 		public void invoke(Integer value) throws Exception {
-			if (idx == 0) {
-				synchronized (collection1) {
-					collection1.add(value);
-				}
-			}
-			else {
-				synchronized (collection2) {
-					collection2.add(value);
-				}
-			}
-		}
+			Collection<Integer> collection = collections.get(key);
 
-		public Collection<Integer> getResult() {
-			if (idx == 0) {
-				return collection1;
+			synchronized (collection) {
+				collection.add(value);
 			}
+		}
 
-			return collection2;
+		public static void registerCollection(int key, Collection<Integer> collection) {
+			collections.put(key, collection);
 		}
 
 		public static void clear() {
-			collection1.clear();
-			collection2.clear();
+			collections.clear();
 		}
 	}
 }