You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:36 UTC

[2/6] flink git commit: [FLINK-5169] [network] Adjust tests to new consumer logic

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 3972917..126a96e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,24 +20,17 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -73,28 +66,29 @@ public class TestSingleInputGate {
 
 		// Notify about late registrations (added for DataSinkTaskTest#testUnionDataSinkTask).
 		// After merging registerInputOutput and invoke, we have to make sure that the test
-		// notifcations happen at the expected time. In real programs, this is guaranteed by
+		// notifications happen at the expected time. In real programs, this is guaranteed by
 		// the instantiation and request partition life cycle.
 		try {
 			Field f = realGate.getClass().getDeclaredField("inputChannelsWithData");
 			f.setAccessible(true);
-			final BlockingQueue<InputChannel> notifications = (BlockingQueue<InputChannel>) f.get(realGate);
+			final ArrayDeque<InputChannel> notifications = (ArrayDeque<InputChannel>) f.get(realGate);
 
 			doAnswer(new Answer<Void>() {
 				@Override
 				public Void answer(InvocationOnMock invocation) throws Throwable {
 					invocation.callRealMethod();
 
-					if (!notifications.isEmpty()) {
-						EventListener<InputGate> listener = (EventListener<InputGate>) invocation.getArguments()[0];
-						listener.onEvent(inputGate);
+					synchronized (notifications) {
+						if (!notifications.isEmpty()) {
+							InputGateListener listener = (InputGateListener) invocation.getArguments()[0];
+							listener.notifyInputGateNonEmpty(inputGate);
+						}
 					}
 
 					return null;
 				}
-			}).when(inputGate).registerListener(any(EventListener.class));
-		}
-		catch (Exception e) {
+			}).when(inputGate).registerListener(any(InputGateListener.class));
+		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
 
@@ -108,81 +102,8 @@ public class TestSingleInputGate {
 		}
 	}
 
-	public TestSingleInputGate read(Buffer buffer, int channelIndex) throws IOException, InterruptedException {
-		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
-
-		inputChannels[channelIndex].read(buffer);
-
-		return this;
-	}
-
-	public TestSingleInputGate readBuffer() throws IOException, InterruptedException {
-		return readBuffer(0);
-	}
-
-	public TestSingleInputGate readBuffer(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readBuffer();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEvent() throws IOException, InterruptedException {
-		return readEvent(0);
-	}
-
-	public TestSingleInputGate readEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEvent();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfSuperstepEvent() throws IOException, InterruptedException {
-		for (TestInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfSuperstepEvent();
-		}
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEndOfSuperstepEvent();
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfPartitionEvent() throws IOException, InterruptedException {
-		for (TestInputChannel inputChannel : inputChannels) {
-			inputChannel.readEndOfPartitionEvent();
-		}
-
-		return this;
-	}
-
-	public TestSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException, InterruptedException {
-		inputChannels[channelIndex].readEndOfPartitionEvent();
-
-		return this;
-	}
-
 	public SingleInputGate getInputGate() {
 		return inputGate;
 	}
 
-	// ------------------------------------------------------------------------
-
-	public List<Integer> readAllChannels() throws IOException, InterruptedException {
-		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
-
-		for (int i = 0; i < inputChannels.length; i++) {
-			readOrder.add(i);
-		}
-
-		Collections.shuffle(readOrder);
-
-		for (int channelIndex : readOrder) {
-			inputChannels[channelIndex].readBuffer();
-		}
-
-		return readOrder;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index cba3199..84ec202 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
@@ -73,22 +72,32 @@ public class UnionInputGateTest {
 		inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
 		inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
 
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 0);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 0);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 5);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 5);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 3);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 4);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 1);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 6);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 1);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 6);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 2);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 2);
-		SingleInputGateTest.verifyBufferOrEvent(union, true, 7);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 7);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 4);
-		SingleInputGateTest.verifyBufferOrEvent(union, false, 3);
+		ig1.notifyChannelNonEmpty(inputChannels[0][0].getInputChannel());
+		ig1.notifyChannelNonEmpty(inputChannels[0][1].getInputChannel());
+		ig1.notifyChannelNonEmpty(inputChannels[0][2].getInputChannel());
+
+		ig2.notifyChannelNonEmpty(inputChannels[1][0].getInputChannel());
+		ig2.notifyChannelNonEmpty(inputChannels[1][1].getInputChannel());
+		ig2.notifyChannelNonEmpty(inputChannels[1][2].getInputChannel());
+		ig2.notifyChannelNonEmpty(inputChannels[1][3].getInputChannel());
+		ig2.notifyChannelNonEmpty(inputChannels[1][4].getInputChannel());
+
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 0); // gate 1, channel 0
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 3); // gate 2, channel 0
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 1); // gate 1, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 4); // gate 2, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 2); // gate 1, channel 2
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 5); // gate 2, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 0); // gate 1, channel 0
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 6); // gate 2, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 1); // gate 1, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 7); // gate 2, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 2); // gate 1, channel 2
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 3); // gate 2, channel 0
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 4); // gate 2, channel 1
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 5); // gate 2, channel 2
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 6); // gate 2, channel 3
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 7); // gate 2, channel 4
 
 		// Return null when the input gate has received all end-of-partition events
 		assertTrue(union.isFinished());

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 1b51805..676a304 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -22,26 +22,32 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A test subpartition view consumer.
+ * A test subpartition viewQueue consumer.
  *
  * <p> The behaviour of the consumer is customizable by specifying a callback.
  *
  * @see TestConsumerCallback
  */
-public class TestSubpartitionConsumer implements Callable<Boolean> {
+public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvailabilityListener {
 
 	private static final int MAX_SLEEP_TIME_MS = 20;
 
-	/** The subpartition view to consume. */
-	private final ResultSubpartitionView subpartitionView;
+	/** The subpartition viewQueue to consume. */
+	private volatile ResultSubpartitionView subpartitionView;
+
+	private BlockingQueue<ResultSubpartitionView> viewQueue = new ArrayBlockingQueue<>(1);
 
 	/**
 	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
@@ -49,33 +55,43 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
 	 */
 	private final boolean isSlowConsumer;
 
-	/** The callback to handle a read buffer. */
+	/** The callback to handle a notifyNonEmpty buffer. */
 	private final TestConsumerCallback callback;
 
 	/** Random source for sleeps. */
 	private final Random random;
 
+	private final AtomicLong numBuffersAvailable = new AtomicLong();
+
 	public TestSubpartitionConsumer(
-			ResultSubpartitionView subpartitionView,
-			boolean isSlowConsumer,
-			TestConsumerCallback callback) {
+		boolean isSlowConsumer,
+		TestConsumerCallback callback) {
 
-		this.subpartitionView = checkNotNull(subpartitionView);
 		this.isSlowConsumer = isSlowConsumer;
 		this.random = isSlowConsumer ? new Random() : null;
 		this.callback = checkNotNull(callback);
 	}
 
+	public void setSubpartitionView(ResultSubpartitionView subpartitionView) {
+		this.subpartitionView = checkNotNull(subpartitionView);
+	}
+
 	@Override
 	public Boolean call() throws Exception {
-		final TestNotificationListener listener = new TestNotificationListener();
-
 		try {
 			while (true) {
 				if (Thread.interrupted()) {
 					throw new InterruptedException();
 				}
 
+				if (numBuffersAvailable.get() == 0) {
+					synchronized (numBuffersAvailable) {
+						while (numBuffersAvailable.get() == 0) {
+							numBuffersAvailable.wait();
+						}
+					}
+				}
+
 				final Buffer buffer = subpartitionView.getNextBuffer();
 
 				if (isSlowConsumer) {
@@ -83,12 +99,13 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
 				}
 
 				if (buffer != null) {
+					numBuffersAvailable.decrementAndGet();
+
 					if (buffer.isBuffer()) {
 						callback.onBuffer(buffer);
-					}
-					else {
+					} else {
 						final AbstractEvent event = EventSerializer.fromBuffer(buffer,
-								getClass().getClassLoader());
+							getClass().getClassLoader());
 
 						callback.onEvent(event);
 
@@ -100,22 +117,22 @@ public class TestSubpartitionConsumer implements Callable<Boolean> {
 							return true;
 						}
 					}
-				}
-				else {
-					int current = listener.getNumberOfNotifications();
-
-					if (subpartitionView.registerListener(listener)) {
-						listener.waitForNotification(current);
-					}
-					else if (subpartitionView.isReleased()) {
-						return true;
-					}
+				} else if (subpartitionView.isReleased()) {
+					return true;
 				}
 			}
-		}
-		finally {
+		} finally {
 			subpartitionView.releaseAllResources();
 		}
 	}
 
+	@Override
+	public void notifyBuffersAvailable(long numBuffers) {
+		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+			synchronized (numBuffersAvailable) {
+				numBuffersAvailable.notifyAll();
+			}
+			;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 226dc91..f9aea89 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -129,7 +129,6 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 	@Test
 	public void testUnionDataSinkTask() {
-
 		int keyCnt = 10;
 		int valCnt = 20;
 
@@ -147,9 +146,10 @@ public class DataSinkTaskTest extends TaskTestBase {
 
 		try {
 			// For the union reader to work, we need to start notifications *after* the union reader
-			// has been initialized.
+			// has been initialized. This is accomplished via a mockito hack in TestSingleInputGate,
+			// which checks forwards existing notifications on registerListener calls.
 			for (IteratorWrappingTestSingleInputGate<?> reader : readers) {
-				reader.read();
+				reader.notifyNonEmpty();
 			}
 
 			testTask.invoke();

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 0c9fd79..fb8ed68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.FlatMapDriver;
@@ -37,14 +39,20 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
-
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class, ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class ChainTaskTest extends TaskTestBase {
 	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 646c038..8f4bc77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -67,9 +67,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MockEnvironment implements Environment {
-	
+
 	private final TaskInfo taskInfo;
-	
+
 	private final ExecutionConfig executionConfig;
 
 	private final MemoryManager memManager;
@@ -184,7 +184,7 @@ public class MockEnvironment implements Environment {
 						}
 
 						if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-								|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+							|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
 							break;
 						}
 					}
@@ -234,9 +234,9 @@ public class MockEnvironment implements Environment {
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
 		return new TaskManagerRuntimeInfo(
-				"localhost",
-				new UnmodifiableConfiguration(new Configuration()),
-				System.getProperty("java.io.tmpdir"));
+			"localhost",
+			new UnmodifiableConfiguration(new Configuration()),
+			System.getProperty("java.io.tmpdir"));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index eaf44db..53d75b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -67,7 +67,7 @@ public abstract class TaskTestBase extends TestLogger {
 		conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
 
 		if (read) {
-			reader.read();
+			reader.notifyNonEmpty();
 		}
 
 		return reader;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index a093233..876e908 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -43,6 +42,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -129,18 +129,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			}
 
 			// Verify that async producer is in blocking request
-			assertTrue("Producer thread is not blocked.", producerBlocked);
+			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_CONSUMER_THREAD.getStackTrace()), producerBlocked);
 
-			boolean consumerBlocked = false;
+			boolean consumerWaiting = false;
 			for (int i = 0; i < 50; i++) {
 				Thread thread = ASYNC_CONSUMER_THREAD;
 
 				if (thread != null && thread.isAlive()) {
-					StackTraceElement[] stackTrace = thread.getStackTrace();
-					consumerBlocked = isInBlockingQueuePoll(stackTrace);
+					consumerWaiting = thread.getState() == Thread.State.WAITING;
 				}
 
-				if (consumerBlocked) {
+				if (consumerWaiting) {
 					break;
 				} else {
 					// Retry
@@ -149,7 +148,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			}
 
 			// Verify that async consumer is in blocking request
-			assertTrue("Consumer thread is not blocked.", consumerBlocked);
+			assertTrue("Consumer thread is not blocked.", consumerWaiting);
 
 			msg = new CancelJob(jobGraph.getJobID());
 			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
@@ -186,27 +185,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 	}
 
 	/**
-	 * Returns whether the stack trace represents a Thread in a blocking queue
-	 * poll call.
-	 *
-	 * @param stackTrace Stack trace of the Thread to check
-	 *
-	 * @return Flag indicating whether the Thread is in a blocking queue poll
-	 * call.
-	 */
-	private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
-		for (StackTraceElement elem : stackTrace) {
-			if (elem.getMethodName().equals("poll") &&
-					elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
-				return true;
-			}
-		}
-
-		return false;
-	}
-
-	/**
 	 * Invokable emitting records in a separate Thread (not the main Task
 	 * thread).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 322a0f0..6dcb56b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -62,9 +63,9 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 	@SuppressWarnings("unchecked")
 	public StreamTestSingleInputGate(
-			int numInputChannels,
-			int bufferSize,
-			TypeSerializer<T> serializer) throws IOException, InterruptedException {
+		int numInputChannels,
+		int bufferSize,
+		TypeSerializer<T> serializer) throws IOException, InterruptedException {
 		super(numInputChannels, false);
 
 		this.bufferSize = bufferSize;
@@ -86,39 +87,36 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			final int channelIndex = i;
 			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
 			final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
-					new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
+				new SerializationDelegate<StreamElement>(new StreamElementSerializer<T>(serializer));
 
 			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
 			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
 
 
-			final Answer<Buffer> answer = new Answer<Buffer>() {
+			final Answer<BufferAndAvailability> answer = new Answer<BufferAndAvailability>() {
 				@Override
-				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
 					InputValue<Object> input = inputQueues[channelIndex].poll();
 					if (input != null && input.isStreamEnd()) {
 						when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
-								true);
-						return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-					}
-					else if (input != null && input.isStreamRecord()) {
+							true);
+						return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
+					} else if (input != null && input.isStreamRecord()) {
 						Object inputElement = input.getStreamRecord();
 						final Buffer buffer = new Buffer(
-								MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-								mock(BufferRecycler.class));
-						
+							MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+							mock(BufferRecycler.class));
+
 						recordSerializer.setNextBuffer(buffer);
 						delegate.setInstance(inputElement);
 						recordSerializer.addRecord(delegate);
 
 						// Call getCurrentBuffer to ensure size is set
-						return recordSerializer.getCurrentBuffer();
-					}
-					else if (input != null && input.isEvent()) {
+						return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false);
+					} else if (input != null && input.isEvent()) {
 						AbstractEvent event = input.getEvent();
-						return EventSerializer.toBuffer(event);
-					}
-					else {
+						return new BufferAndAvailability(EventSerializer.toBuffer(event), false);
+					} else {
 						synchronized (inputQueues[channelIndex]) {
 							inputQueues[channelIndex].wait();
 							return answer(invocationOnMock);
@@ -130,7 +128,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
 
 			inputGate.setInputChannel(new IntermediateResultPartitionID(),
-					inputChannels[channelIndex].getInputChannel());
+				inputChannels[channelIndex].getInputChannel());
 		}
 	}
 
@@ -139,7 +137,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			inputQueues[channel].add(InputValue.element(element));
 			inputQueues[channel].notifyAll();
 		}
-		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
 	}
 
 	public void sendEvent(AbstractEvent event, int channel) {
@@ -147,7 +145,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 			inputQueues[channel].add(InputValue.event(event));
 			inputQueues[channel].notifyAll();
 		}
-		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[channel].getInputChannel());
 	}
 
 	public void endInput() {
@@ -156,7 +154,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 				inputQueues[i].add(InputValue.streamEnd());
 				inputQueues[i].notifyAll();
 			}
-			inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
+			inputGate.notifyChannelNonEmpty(inputChannels[i].getInputChannel());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index a8a989b..0cf866a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -17,25 +17,24 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
+import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
 /**
  * The test generates two random streams (input channels) which independently
  * and randomly generate checkpoint barriers. The two streams are very
@@ -165,7 +164,7 @@ public class BarrierBufferMassiveRandomTest {
 		public void sendTaskEvent(TaskEvent event) {}
 
 		@Override
-		public void registerListener(EventListener<InputGate> listener) {}
+		public void registerListener(InputGateListener listener) {}
 
 		@Override
 		public int getPageSize() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index cb8a058..3e2a75a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
 
 import java.util.ArrayDeque;
 import java.util.List;
@@ -31,16 +31,15 @@ import java.util.Queue;
 public class MockInputGate implements InputGate {
 
 	private final int pageSize;
-	
+
 	private final int numChannels;
-	
+
 	private final Queue<BufferOrEvent> boes;
 
 	private final boolean[] closed;
-	
+
 	private int closedChannels;
 
-	
 	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) {
 		this.pageSize = pageSize;
 		this.numChannels = numChannels;
@@ -52,7 +51,7 @@ public class MockInputGate implements InputGate {
 	public int getPageSize() {
 		return pageSize;
 	}
-	
+
 	@Override
 	public int getNumberOfInputChannels() {
 		return numChannels;
@@ -69,11 +68,11 @@ public class MockInputGate implements InputGate {
 		if (next == null) {
 			return null;
 		}
-		
+
 		int channelIdx = next.getChannelIndex();
 		if (closed[channelIdx]) {
 			throw new RuntimeException("Inconsistent: Channel " + channelIdx
-					+ " has data even though it is already closed.");
+				+ " has data even though it is already closed.");
 		}
 		if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
 			closed[channelIdx] = true;
@@ -83,12 +82,15 @@ public class MockInputGate implements InputGate {
 	}
 
 	@Override
-	public void requestPartitions() {}
+	public void requestPartitions() {
+	}
 
 	@Override
-	public void sendTaskEvent(TaskEvent event) {}
+	public void sendTaskEvent(TaskEvent event) {
+	}
 
 	@Override
-	public void registerListener(EventListener<InputGate> listener) {}
-	
-}
\ No newline at end of file
+	public void registerListener(InputGateListener listener) {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 3126d71..d86c809 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -45,7 +45,7 @@ import java.io.IOException;
  *
  * <p>
  * When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * of the Task notifyNonEmpty from this queue. Use {@link #waitForInputProcessing()} to wait until all
  * queues are empty. This must be used after entering some elements before checking the
  * desired output.
  */
@@ -58,11 +58,13 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 	 * Creates a test harness with the specified number of input gates and specified number
 	 * of channels per input gate.
 	 */
-	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-			int numInputGates,
-			int numInputChannelsPerGate,
-			TypeInformation<IN> inputType,
-			TypeInformation<OUT> outputType) {
+	public OneInputStreamTaskTestHarness(
+		OneInputStreamTask<IN, OUT> task,
+		int numInputGates,
+		int numInputChannelsPerGate,
+		TypeInformation<IN> inputType,
+		TypeInformation<OUT> outputType) {
+		
 		super(task, outputType);
 
 		this.inputType = inputType;
@@ -75,9 +77,10 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 	/**
 	 * Creates a test harness with one input gate that has one input channel.
 	 */
-	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
-			TypeInformation<IN> inputType,
-			TypeInformation<OUT> outputType) {
+	public OneInputStreamTaskTestHarness(
+		OneInputStreamTask<IN, OUT> task,
+		TypeInformation<IN> inputType,
+		TypeInformation<OUT> outputType) {
 		this(task, 1, 1, inputType, outputType);
 	}
 
@@ -87,9 +90,9 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 
 		for (int i = 0; i < numInputGates; i++) {
 			inputGates[i] = new StreamTestSingleInputGate<IN>(
-					numInputChannelsPerGate,
-					bufferSize,
-					inputSerializer);
+				numInputChannelsPerGate,
+				bufferSize,
+				inputSerializer);
 			this.mockEnv.addInputGate(inputGates[i].getInputGate());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 52daf6f..17a0857 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,13 +102,13 @@ public class StreamMockEnvironment implements Environment {
 	private volatile boolean wasFailedExternally = false;
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
-									long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+								 long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.taskInfo = new TaskInfo(
-				"", /* task name */
-				1, /* num key groups / max parallelism */
-				0, /* index of this subtask */
-				1, /* num subtasks */
-				0 /* attempt number */);
+			"", /* task name */
+			1, /* num key groups / max parallelism */
+			0, /* index of this subtask */
+			1, /* num subtasks */
+			0 /* attempt number */);
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();
@@ -146,8 +146,8 @@ public class StreamMockEnvironment implements Environment {
 				@Override
 				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
 					return new Buffer(
-							MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-							mock(BufferRecycler.class));
+						MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+						mock(BufferRecycler.class));
 				}
 			});
 
@@ -175,7 +175,7 @@ public class StreamMockEnvironment implements Environment {
 						}
 
 						if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-								|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+							|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
 							break;
 						}
 					}
@@ -318,7 +318,7 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public void acknowledgeCheckpoint(
-			CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
+		CheckpointMetaData checkpointMetaData, SubtaskState subtaskState) {
 	}
 
 	@Override
@@ -343,4 +343,3 @@ public class StreamMockEnvironment implements Environment {
 		return new UnregisteredTaskMetricsGroup();
 	}
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 95828f8..ebe5285 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -28,13 +28,11 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.eq;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index b71e38d..b20b3a3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -92,7 +92,7 @@ public class StreamTaskTestHarness<OUT> {
 	// input related methods only need to be implemented once, in generic form
 	protected int numInputGates;
 	protected int numInputChannelsPerGate;
-	
+
 	@SuppressWarnings("rawtypes")
 	protected StreamTestSingleInputGate[] inputGates;
 
@@ -329,7 +329,7 @@ public class StreamTaskTestHarness<OUT> {
 			try {
 				Thread.sleep(10);
 			} catch (InterruptedException ignored) {}
-			
+
 			if (allEmpty) {
 				break;
 			}
@@ -337,7 +337,7 @@ public class StreamTaskTestHarness<OUT> {
 
 		// then wait for the Task Thread to be in a blocked state
 		// Check whether the state is blocked, this should be the case if it cannot
-		// read more input, i.e. all currently available input has been processed.
+		// notifyNonEmpty more input, i.e. all currently available input has been processed.
 		while (true) {
 			Thread.State state = taskThread.getState();
 			if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
@@ -360,13 +360,13 @@ public class StreamTaskTestHarness<OUT> {
 			inputGates[i].endInput();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	private class TaskThread extends Thread {
-		
+
 		private final AbstractInvokable task;
-		
+
 		private volatile Throwable error;
 
 		TaskThread(AbstractInvokable task) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 1076eeb..3cd9c9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -199,6 +199,8 @@ public class TwoInputStreamTaskTest {
 		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
 		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
 
+		testHarness.waitForInputProcessing();
+
 		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
 		// on that input, only add to same input, otherwise we would not know the ordering
 		// of the output since the Task might read the inputs in any order
@@ -221,8 +223,8 @@ public class TwoInputStreamTaskTest {
 
 		// we should not yet see the barrier, only the two elements from non-blocked input
 		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				testHarness.getOutput(),
-				expectedOutput);
+			expectedOutput,
+			testHarness.getOutput());
 
 		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
 		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index edb1642..f5b7566 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -124,7 +124,6 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
 							bufferSize,
 							inputSerializer1);
 
-
 					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
 							targetVertexDummy,
 							1,