You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:37 UTC

[3/6] flink git commit: [FLINK-5169] [network] Adjust tests to new consumer logic

[FLINK-5169] [network] Adjust tests to new consumer logic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3ac0adf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3ac0adf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3ac0adf

Branch: refs/heads/master
Commit: d3ac0adfd7ed8878f0c80e0c454c580969e40cfc
Parents: f728129
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Nov 28 09:59:58 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:42:49 2016 +0100

----------------------------------------------------------------------
 .../runtime/io/disk/SpillingBufferTest.java     |  40 ++--
 .../iomanager/BufferFileWriterReaderTest.java   |   1 -
 .../io/network/api/reader/BufferReaderTest.java | 115 -----------
 .../netty/CancelPartitionRequestTest.java       |  37 ++--
 .../netty/PartitionRequestQueueTest.java        |  23 ++-
 .../netty/ServerTransportErrorHandlingTest.java |  54 +++---
 .../PartialConsumePipelinedResultTest.java      |  18 +-
 .../partition/PipelinedSubpartitionTest.java    | 118 +++---------
 .../network/partition/ResultPartitionTest.java  |   1 -
 .../partition/SpillableSubpartitionTest.java    |  20 +-
 .../SpilledSubpartitionViewAsyncIOTest.java     |  65 -------
 .../SpilledSubpartitionViewSyncIOTest.java      | 103 ----------
 .../partition/SpilledSubpartitionViewTest.java  | 192 ++++++++++++-------
 .../network/partition/SubpartitionTestBase.java |  10 +-
 .../partition/consumer/InputChannelTest.java    |  13 +-
 .../IteratorWrappingTestSingleInputGate.java    |  23 +--
 .../consumer/LocalInputChannelTest.java         |  84 ++++----
 .../consumer/RemoteInputChannelTest.java        |   2 +-
 .../partition/consumer/SingleInputGateTest.java |  78 +++-----
 .../partition/consumer/TestInputChannel.java    |  32 +---
 .../partition/consumer/TestSingleInputGate.java | 101 ++--------
 .../partition/consumer/UnionInputGateTest.java  |  43 +++--
 .../network/util/TestSubpartitionConsumer.java  |  69 ++++---
 .../runtime/operators/DataSinkTaskTest.java     |   6 +-
 .../operators/chaining/ChainTaskTest.java       |  12 +-
 .../operators/testutils/MockEnvironment.java    |  12 +-
 .../operators/testutils/TaskTestBase.java       |   2 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  34 +---
 .../consumer/StreamTestSingleInputGate.java     |  46 +++--
 .../io/BarrierBufferMassiveRandomTest.java      |  17 +-
 .../streaming/runtime/io/MockInputGate.java     |  28 +--
 .../tasks/OneInputStreamTaskTestHarness.java    |  27 +--
 .../runtime/tasks/StreamMockEnvironment.java    |  21 +-
 .../StreamTaskCancellationBarrierTest.java      |   2 -
 .../runtime/tasks/StreamTaskTestHarness.java    |  14 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   6 +-
 .../tasks/TwoInputStreamTaskTestHarness.java    |   1 -
 37 files changed, 554 insertions(+), 916 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 538c416..01a9723 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -109,7 +109,7 @@ public class SpillingBufferTest {
 		DataInputView inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
@@ -121,14 +121,14 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
-		// re-read the data
+		// re-notifyNonEmpty the data
 		inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
 			serializer.deserialize(readRec, inView);
@@ -139,7 +139,7 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -169,7 +169,7 @@ public class SpillingBufferTest {
 		DataInputView inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
@@ -182,7 +182,7 @@ public class SpillingBufferTest {
 				int k2 = readRec.f0;
 				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -190,11 +190,11 @@ public class SpillingBufferTest {
 			// expected
 		}
 		
-		// re-read the data
+		// re-notifyNonEmpty the data
 		inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
 			generator.next(rec);
 			serializer.deserialize(readRec, inView);
@@ -205,7 +205,7 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -237,7 +237,7 @@ public class SpillingBufferTest {
 		DataInputView inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
@@ -249,14 +249,14 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
-		// re-read the data
+		// re-notifyNonEmpty the data
 		inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
 			serializer.deserialize(readRec, inView);
@@ -267,7 +267,7 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());
@@ -297,7 +297,7 @@ public class SpillingBufferTest {
 		DataInputView inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		final Tuple2<Integer, String> readRec = new Tuple2<>();
 		try {
 			for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
@@ -310,7 +310,7 @@ public class SpillingBufferTest {
 				int k2 = readRec.f0;
 				String v2 = readRec.f1;
 				
-				Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+				Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 			}
 			Assert.fail("Read too much, expected EOFException.");
 		}
@@ -318,11 +318,11 @@ public class SpillingBufferTest {
 			// expected
 		}
 		
-		// re-read the data
+		// re-notifyNonEmpty the data
 		inView = outView.flip();
 		generator.reset();
 		
-		// read and re-generate all records and compare them
+		// notifyNonEmpty and re-generate all records and compare them
 		for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
 			generator.next(rec);
 			serializer.deserialize(readRec, inView);
@@ -333,7 +333,7 @@ public class SpillingBufferTest {
 			int k2 = readRec.f0;
 			String v2 = readRec.f1;
 			
-			Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
+			Assert.assertTrue("The re-generated and the notifyNonEmpty record do not match.", k1 == k2 && v1.equals(v2));
 		}
 		
 		this.memoryManager.release(outView.close());

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 375be45..2da0f7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
deleted file mode 100644
index 099b6fb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Task.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("unchecked")
-public class BufferReaderTest {
-
-	@Test
-	public void testGetNextBufferOrEvent() throws IOException, InterruptedException {
-
-		final TestSingleInputGate inputGate = new TestSingleInputGate(1)
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readEndOfPartitionEvent();
-
-		final BufferReader reader = new BufferReader(inputGate.getInputGate());
-
-		// Task event listener to be notified...
-		final EventListener<TaskEvent> listener = mock(EventListener.class);
-		reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
-		int numReadBuffers = 0;
-		while ((reader.getNextBuffer()) != null) {
-			numReadBuffers++;
-		}
-
-		assertEquals(5, numReadBuffers);
-		verify(listener, times(2)).onEvent(any(TaskEvent.class));
-	}
-
-	@Test
-	public void testIterativeGetNextBufferOrEvent() throws IOException, InterruptedException {
-
-		final TestSingleInputGate inputGate = new TestSingleInputGate(1)
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readEndOfSuperstepEvent()
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readBuffer().readEvent()
-				.readBuffer().readEndOfPartitionEvent();
-
-		final BufferReader reader = new BufferReader(inputGate.getInputGate());
-
-		// Set reader iterative
-		reader.setIterativeReader();
-
-		// Task event listener to be notified...
-		final EventListener<TaskEvent> listener = mock(EventListener.class);
-		// Task event listener to be notified...
-		reader.registerTaskEventListener(listener, TestTaskEvent.class);
-
-		int numReadBuffers = 0;
-		int numEndOfSuperstepEvents = 0;
-
-		while (true) {
-			Buffer buffer = reader.getNextBuffer();
-
-			if (buffer != null) {
-				numReadBuffers++;
-			}
-			else if (reader.hasReachedEndOfSuperstep()) {
-				reader.startNextSuperstep();
-
-				numEndOfSuperstepEvents++;
-			}
-			else if (reader.isFinished()) {
-				break;
-			}
-		}
-
-		assertEquals(10, numReadBuffers);
-		assertEquals(1, numEndOfSuperstepEvents);
-
-		verify(listener, times(4)).onEvent(any(TaskEvent.class));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 1ff1e99..a2f866a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -24,14 +24,16 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.event.NotificationListener;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
@@ -73,11 +75,18 @@ public class CancelPartitionRequestTest {
 
 			CountDownLatch sync = new CountDownLatch(1);
 
-			ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+			final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
 
 			// Return infinite subpartition
-			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
-					.thenReturn(view);
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+				.thenAnswer(new Answer<ResultSubpartitionView>() {
+					@Override
+					public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+						BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+						listener.notifyBuffersAvailable(Long.MAX_VALUE);
+						return view;
+					}
+				});
 
 			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
 					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -109,19 +118,26 @@ public class CancelPartitionRequestTest {
 		NettyServerAndClient serverAndClient = null;
 
 		try {
-			TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
+			final TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
 
 			ResultPartitionManager partitions = mock(ResultPartitionManager.class);
 
 			ResultPartitionID pid = new ResultPartitionID();
 
-			CountDownLatch sync = new CountDownLatch(1);
+			final CountDownLatch sync = new CountDownLatch(1);
 
-			ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
+			final ResultSubpartitionView view = spy(new InfiniteSubpartitionView(outboundBuffers, sync));
 
 			// Return infinite subpartition
-			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
-					.thenReturn(view);
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+					.thenAnswer(new Answer<ResultSubpartitionView>() {
+						@Override
+						public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+							BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+							listener.notifyBuffersAvailable(Long.MAX_VALUE);
+							return view;
+						}
+					});
 
 			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
 					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
@@ -174,8 +190,7 @@ public class CancelPartitionRequestTest {
 		}
 
 		@Override
-		public boolean registerListener(final NotificationListener listener) throws IOException {
-			return false;
+		public void notifyBuffersAvailable(long buffers) throws IOException {
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3f281bd..7224e96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,12 +20,18 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,14 +41,27 @@ public class PartitionRequestQueueTest {
 	public void testProducerFailedException() throws Exception {
 		PartitionRequestQueue queue = new PartitionRequestQueue();
 
-		EmbeddedChannel ch = new EmbeddedChannel(queue);
+		ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
+		ResultPartitionID rpid = new ResultPartitionID();
+		BufferProvider bufferProvider = mock(BufferProvider.class);
 
 		ResultSubpartitionView view = mock(ResultSubpartitionView.class);
 		when(view.isReleased()).thenReturn(true);
 		when(view.getFailureCause()).thenReturn(new RuntimeException("Expected test exception"));
 
+		when(partitionProvider.createSubpartitionView(
+			eq(rpid),
+			eq(0),
+			eq(bufferProvider),
+			any(BufferAvailabilityListener.class))).thenReturn(view);
+
+		EmbeddedChannel ch = new EmbeddedChannel(queue);
+
+		SequenceNumberingViewReader seqView = new SequenceNumberingViewReader(new InputChannelID(), queue);
+		seqView.requestSubpartitionView(partitionProvider, rpid, 0, bufferProvider);
+
 		// Enqueue the erroneous view
-		queue.enqueue(view, new InputChannelID());
+		queue.notifyReaderNonEmpty(seqView);
 		ch.runPendingTasks();
 
 		// Read the enqueued msg

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 1515f83..1c3557e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -25,20 +25,20 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
-import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
@@ -63,36 +63,43 @@ public class ServerTransportErrorHandlingTest {
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
 		when(partitionManager
-				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)))
-				.thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync));
+			.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
+			.thenAnswer(new Answer<ResultSubpartitionView>() {
+				@Override
+				public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+					BufferAvailabilityListener listener = (BufferAvailabilityListener) invocationOnMock.getArguments()[3];
+					listener.notifyBuffersAvailable(Long.MAX_VALUE);
+					return new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
+				}
+			});
 
 		NettyProtocol protocol = new NettyProtocol() {
 			@Override
 			public ChannelHandler[] getServerChannelHandlers() {
 				return new PartitionRequestProtocol(
-						partitionManager,
-						mock(TaskEventDispatcher.class),
-						mock(NetworkBufferPool.class)).getServerChannelHandlers();
+					partitionManager,
+					mock(TaskEventDispatcher.class),
+					mock(NetworkBufferPool.class)).getServerChannelHandlers();
 			}
 
 			@Override
 			public ChannelHandler[] getClientChannelHandlers() {
-				return new ChannelHandler[] {
-						new NettyMessageEncoder(),
-						// Close on read
-						new ChannelInboundHandlerAdapter() {
-							@Override
-							public void channelRead(ChannelHandlerContext ctx, Object msg)
-									throws Exception {
-
-								ctx.channel().close();
-							}
+				return new ChannelHandler[]{
+					new NettyMessage.NettyMessageEncoder(),
+					// Close on read
+					new ChannelInboundHandlerAdapter() {
+						@Override
+						public void channelRead(ChannelHandlerContext ctx, Object msg)
+							throws Exception {
+
+							ctx.channel().close();
 						}
+					}
 				};
 			}
 		};
 
-		NettyServerAndClient serverAndClient = null;
+		NettyTestUtil.NettyServerAndClient serverAndClient = null;
 
 		try {
 			serverAndClient = initServerAndClient(protocol, createConfig());
@@ -100,15 +107,14 @@ public class ServerTransportErrorHandlingTest {
 			Channel ch = connect(serverAndClient);
 
 			// Write something to trigger close by server
-			ch.writeAndFlush(new PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
+			ch.writeAndFlush(new NettyMessage.PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
 
 			// Wait for the notification
 			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
 				fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
-						" ms to be notified about released partition.");
+					" ms to be notified about released partition.");
 			}
-		}
-		finally {
+		} finally {
 			shutdown(serverAndClient);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index af8aa69..1ec4ad3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -87,12 +87,12 @@ public class PartialConsumePipelinedResultTest {
 		// The partition needs to be pipelined, otherwise the original issue does not occur, because
 		// the sender and receiver are not online at the same time.
 		receiver.connectNewDataSetAsInput(
-				sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+			sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
 
 		final JobGraph jobGraph = new JobGraph("Partial Consume of Pipelined Result", sender, receiver);
 
 		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
-				sender.getID(), receiver.getID());
+			sender.getID(), receiver.getID());
 
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
@@ -127,11 +127,11 @@ public class PartialConsumePipelinedResultTest {
 
 		@Override
 		public void invoke() throws Exception {
-			final BufferReader reader = new BufferReader(getEnvironment().getInputGate(0));
-
-			final Buffer buffer = reader.getNextBuffer();
-
-			buffer.recycle();
+			InputGate gate = getEnvironment().getInputGate(0);
+			Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
+			if (buffer != null) {
+				buffer.recycle();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8750a1a..a56177e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
@@ -38,12 +37,13 @@ import java.util.concurrent.Future;
 
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -63,80 +63,25 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	@Test
-	public void testRegisterListener() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
-
-		final TestNotificationListener listener = new TestNotificationListener();
-
-		// Register a listener
-		assertTrue(subpartition.registerListener(listener));
-
-		// Try to register another listener
-		try {
-			subpartition.registerListener(listener);
-
-			fail("Did not throw expected exception after duplicate listener registration.");
-		}
-		catch (IllegalStateException expected) {
-		}
-	}
-
-	@Test
-	public void testListenerNotification() throws Exception {
-		final TestNotificationListener listener = new TestNotificationListener();
-		assertEquals(0, listener.getNumberOfNotifications());
-
-		{
-			final PipelinedSubpartition subpartition = createSubpartition();
-
-			// Register a listener
-			assertTrue(subpartition.registerListener(listener));
-
-			// Notify on add and remove listener
-			subpartition.add(mock(Buffer.class));
-			assertEquals(1, listener.getNumberOfNotifications());
-
-			// No notification, should have removed listener after first notification
-			subpartition.add(mock(Buffer.class));
-			assertEquals(1, listener.getNumberOfNotifications());
-		}
-
-		{
-			final PipelinedSubpartition subpartition = createSubpartition();
-
-			// Register a listener
-			assertTrue(subpartition.registerListener(listener));
-
-			// Notify on finish
-			subpartition.finish();
-			assertEquals(2, listener.getNumberOfNotifications());
-		}
-
-		{
-			final PipelinedSubpartition subpartition = createSubpartition();
-
-			// Register a listener
-			assertTrue(subpartition.registerListener(listener));
-
-			// Notify on release
-			subpartition.release();
-			assertEquals(3, listener.getNumberOfNotifications());
-		}
-	}
-
-	@Test
 	public void testIllegalReadViewRequest() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
 
 		// Successful request
-		assertNotNull(subpartition.createReadView(null));
+		assertNotNull(subpartition.createReadView(null, new BufferAvailabilityListener() {
+			@Override
+			public void notifyBuffersAvailable(long numBuffers) {
+			}
+		}));
 
 		try {
-			subpartition.createReadView(null);
+			subpartition.createReadView(null, new BufferAvailabilityListener() {
+				@Override
+				public void notifyBuffersAvailable(long numBuffers) {
+				}
+			});
 
-			fail("Did not throw expected exception after duplicate read view request.");
-		}
-		catch (IllegalStateException expected) {
+			fail("Did not throw expected exception after duplicate notifyNonEmpty view request.");
+		} catch (IllegalStateException expected) {
 		}
 	}
 
@@ -144,23 +89,19 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
 
-		TestNotificationListener listener = new TestNotificationListener();
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
 
-		ResultSubpartitionView view = subpartition.createReadView(null);
+		ResultSubpartitionView view = subpartition.createReadView(null, listener);
 
 		// Empty => should return null
 		assertNull(view.getNextBuffer());
-
-		// Register listener for notifications
-		assertTrue(view.registerListener(listener));
-
-		assertEquals(0, listener.getNumberOfNotifications());
+		verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
 
 		// Add data to the queue...
 		subpartition.add(createBuffer());
 
 		// ...should have resulted in a notification
-		assertEquals(1, listener.getNumberOfNotifications());
+		verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
 
 		// ...and one available result
 		assertNotNull(view.getNextBuffer());
@@ -168,10 +109,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		// Add data to the queue...
 		subpartition.add(createBuffer());
-		// ...don't allow to subscribe, if data is available
-		assertFalse(view.registerListener(listener));
-
-		assertEquals(1, listener.getNumberOfNotifications());
+		verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 	}
 
 	@Test
@@ -208,7 +146,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 			@Override
 			public BufferOrEvent getNextBufferOrEvent() throws Exception {
-
 				if (numberOfBuffers == producerNumberOfBuffersToProduce) {
 					return null;
 				}
@@ -261,16 +198,17 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		final PipelinedSubpartition subpartition = createSubpartition();
 
-		final PipelinedSubpartitionView view = subpartition.createReadView(null);
+		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
+		final PipelinedSubpartitionView view = subpartition.createReadView(null, consumer);
+		consumer.setSubpartitionView(view);
 
-		Future<Boolean> producer = executorService.submit(
-				new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
+		Future<Boolean> producerResult = executorService.submit(
+			new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
 
-		Future<Boolean> consumer = executorService.submit(
-				new TestSubpartitionConsumer(view, isSlowConsumer, consumerCallback));
+		Future<Boolean> consumerResult = executorService.submit(consumer);
 
 		// Wait for producer and consumer to finish
-		producer.get();
-		consumer.get();
+		producerResult.get();
+		consumerResult.get();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f6fddfa..4eb4fd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -86,7 +86,6 @@ public class ResultPartitionTest {
 			mock(ResultPartitionManager.class),
 			notifier,
 			mock(IOManager.class),
-			IOManager.IOMode.SYNC,
 			sendScheduleOrUpdateConsumersMessage);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index d7e56c8..b7a54d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -34,7 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -59,7 +60,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 	@Override
 	ResultSubpartition createSubpartition() {
-		return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager, SYNC);
+		return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
 	}
 
 	/**
@@ -87,14 +88,14 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		// Mock I/O manager returning the blocking spill writer
 		IOManager ioManager = mock(IOManager.class);
 		when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class)))
-				.thenReturn(spillWriter);
+			.thenReturn(spillWriter);
 
 		// The partition
 		final SpillableSubpartition partition = new SpillableSubpartition(
-				0, mock(ResultPartition.class), ioManager, SYNC);
+			0, mock(ResultPartition.class), ioManager);
 
 		// Spill the partition initially (creates the spill writer)
-		partition.releaseMemory();
+		assertEquals(0, partition.releaseMemory());
 
 		ExecutorService executor = Executors.newSingleThreadExecutor();
 
@@ -130,13 +131,18 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	public void testReleasePartitionAndGetNext() throws Exception {
 		// Create partition and add some buffers
 		SpillableSubpartition partition = new SpillableSubpartition(
-				0, mock(ResultPartition.class), ioManager, SYNC);
+			0, mock(ResultPartition.class), ioManager);
 
 		partition.finish();
 
 		// Create the read view
 		ResultSubpartitionView readView = spy(partition
-				.createReadView(new TestInfiniteBufferProvider()));
+			.createReadView(new TestInfiniteBufferProvider(), new BufferAvailabilityListener() {
+				@Override
+				public void notifyBuffersAvailable(long numBuffers) {
+
+				}
+			}));
 
 		// The released state check (of the parent) needs to be independent
 		// of the released state of the view.

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
deleted file mode 100644
index 981c8ee..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIOTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewAsyncIOTest {
-
-	private static final IOManager ioManager = new IOManagerAsync();
-
-	@AfterClass
-	public static void shutdown() {
-		ioManager.shutdown();
-	}
-
-	@Test
-	public void testWriteConsume() throws Exception {
-		// Config
-		final int numberOfBuffersToWrite = 1024;
-
-		// Setup
-		final BufferFileWriter writer = SpilledSubpartitionViewTest
-				.createWriterAndWriteBuffers(ioManager, new TestInfiniteBufferProvider(), numberOfBuffersToWrite);
-
-		writer.close();
-
-		final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
-
-		final SpilledSubpartitionViewAsyncIO view = new SpilledSubpartitionViewAsyncIO(
-				mock(ResultSubpartition.class), viewBufferPool, ioManager,
-				writer.getChannelID(), 0);
-
-		final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
-				new TestConsumerCallback.RecyclingCallback());
-
-		// Consume subpartition
-		consumer.call();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
deleted file mode 100644
index f8baae4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIOTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-
-public class SpilledSubpartitionViewSyncIOTest {
-
-	private static final IOManager ioManager = new IOManagerAsync();
-
-	private static final TestInfiniteBufferProvider writerBufferPool =
-			new TestInfiniteBufferProvider();
-
-	@AfterClass
-	public static void shutdown() {
-		ioManager.shutdown();
-	}
-
-	@Test
-	public void testWriteConsume() throws Exception {
-		// Config
-		final int numberOfBuffersToWrite = 512;
-
-		// Setup
-		final BufferFileWriter writer = SpilledSubpartitionViewTest
-				.createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite);
-
-		writer.close();
-
-		final TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
-
-		final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO(
-				mock(ResultSubpartition.class),
-				viewBufferPool.getMemorySegmentSize(),
-				writer.getChannelID(),
-				0);
-
-		final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
-				new TestConsumerCallback.RecyclingCallback());
-
-		// Consume subpartition
-		consumer.call();
-	}
-
-	@Test
-	public void testConsumeWithFewBuffers() throws Exception {
-		// Config
-		final int numberOfBuffersToWrite = 512;
-
-		// Setup
-		final BufferFileWriter writer = SpilledSubpartitionViewTest
-				.createWriterAndWriteBuffers(ioManager, writerBufferPool, numberOfBuffersToWrite);
-
-		writer.close();
-
-		final SpilledSubpartitionViewSyncIO view = new SpilledSubpartitionViewSyncIO(
-				mock(ResultSubpartition.class),
-				32 * 1024,
-				writer.getChannelID(),
-				0);
-
-		// No buffer available, don't deadlock. We need to make progress in situations when the view
-		// is consumed at an input gate with local and remote channels. The remote channels might
-		// eat up all the buffers, at which point the spilled view will not have any buffers
-		// available and the input gate can't make any progress if we don't return immediately.
-		//
-		// The current solution is straight-forward with a separate buffer per spilled subpartition,
-		// but introduces memory-overhead.
-		//
-		// TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
-		// consumed subpartition.
-		final TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(view, false,
-				new TestConsumerCallback.RecyclingCallback());
-
-		consumer.call();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 5722cac..8f8da93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -18,26 +18,21 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback.RecyclingCallback;
+import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -47,55 +42,103 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Mockito.mock;
 
-/**
- * Test for both the asynchronous and synchronous spilled subpartition view implementation.
- */
-@RunWith(Parameterized.class)
 public class SpilledSubpartitionViewTest {
 
-	private static final IOManager ioManager = new IOManagerAsync();
-
-	private static final ExecutorService executor = Executors.newCachedThreadPool();
+	private static final IOManager IO_MANAGER = new IOManagerAsync();
 
 	private static final TestInfiniteBufferProvider writerBufferPool =
-			new TestInfiniteBufferProvider();
-
-	private IOMode ioMode;
-
-	public SpilledSubpartitionViewTest(IOMode ioMode) {
-		this.ioMode = ioMode;
-	}
+		new TestInfiniteBufferProvider();
 
 	@AfterClass
 	public static void shutdown() {
-		ioManager.shutdown();
-		executor.shutdown();
+		IO_MANAGER.shutdown();
 	}
 
-	@Parameterized.Parameters
-	public static Collection<Object[]> ioMode() {
-		return Arrays.asList(new Object[][]{
-				{IOMode.SYNC},
-				{IOMode.ASYNC}});
+	@Test
+	public void testWriteConsume() throws Exception {
+		// Config
+		final int numberOfBuffersToWrite = 512;
+
+		// Setup
+		final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
+
+		writer.close();
+
+		TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
+
+		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
+			false, new TestConsumerCallback.RecyclingCallback());
+
+		SpilledSubpartitionView view = new SpilledSubpartitionView(
+			mock(ResultSubpartition.class),
+			viewBufferPool.getMemorySegmentSize(),
+			writer,
+			numberOfBuffersToWrite + 1, // +1 for end-of-partition
+			consumer);
+
+		consumer.setSubpartitionView(view);
+
+		// Consume subpartition
+		consumer.call();
 	}
 
 	@Test
-	public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
+	public void testConsumeWithFewBuffers() throws Exception {
+		// Config
+		final int numberOfBuffersToWrite = 512;
+
 		// Setup
-		BufferFileWriter[] writers = new BufferFileWriter[]{
-				createWriterAndWriteBuffers(ioManager, writerBufferPool, 512),
-				createWriterAndWriteBuffers(ioManager, writerBufferPool, 512)
-		};
+		final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite);
+
+		writer.close();
+
+		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
+			false, new TestConsumerCallback.RecyclingCallback());
+
+		SpilledSubpartitionView view = new SpilledSubpartitionView(
+			mock(ResultSubpartition.class),
+			32 * 1024,
+			writer,
+			numberOfBuffersToWrite + 1,
+			consumer);
+
+		consumer.setSubpartitionView(view);
+
+		// No buffer available, don't deadlock. We need to make progress in situations when the view
+		// is consumed at an input gate with local and remote channels. The remote channels might
+		// eat up all the buffers, at which point the spilled view will not have any buffers
+		// available and the input gate can't make any progress if we don't return immediately.
+		//
+		// The current solution is straight-forward with a separate buffer per spilled subpartition,
+		// but introduces memory-overhead.
+		//
+		// TODO Replace with asynchronous buffer pool request as this introduces extra buffers per
+		// consumed subpartition.
+		consumer.call();
+	}
 
-		final ResultSubpartitionView[] readers = new ResultSubpartitionView[writers.length];
+	@Test
+	public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
+		ExecutorService executor = null;
+		BufferFileWriter[] writers = null;
+		ResultSubpartitionView[] readers = null;
 
-		// Make this buffer pool small so that we can test the behaviour of the asynchronous view
-		// with few  buffers.
-		final BufferProvider inputBuffers = new TestPooledBufferProvider(2);
+		try {
+			executor = Executors.newCachedThreadPool();
 
-		final ResultSubpartition parent = mock(ResultSubpartition.class);
+			// Setup
+			writers = new BufferFileWriter[]{
+				createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512),
+				createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512)
+			};
+
+			readers = new ResultSubpartitionView[writers.length];
+			TestSubpartitionConsumer[] consumers = new TestSubpartitionConsumer[writers.length];
+
+			BufferProvider inputBuffers = new TestPooledBufferProvider(2);
+
+			ResultSubpartition parent = mock(ResultSubpartition.class);
 
-		try {
 			// Wait for writers to finish
 			for (BufferFileWriter writer : writers) {
 				writer.close();
@@ -103,56 +146,56 @@ public class SpilledSubpartitionViewTest {
 
 			// Create the views depending on the test configuration
 			for (int i = 0; i < readers.length; i++) {
-				if (ioMode.isSynchronous()) {
-					readers[i] = new SpilledSubpartitionViewSyncIO(
-							parent,
-							inputBuffers.getMemorySegmentSize(),
-							writers[i].getChannelID(),
-							0);
-				}
-				else {
-					// For the asynchronous view, it is important that a registered listener will
-					// eventually be notified even if the view never got a buffer to read data into.
-					//
-					// At runtime, multiple threads never share the same buffer pool as in test. We
-					// do it here to provoke the erroneous behaviour.
-					readers[i] = new SpilledSubpartitionViewAsyncIO(
-							parent, inputBuffers, ioManager, writers[i].getChannelID(), 0);
-				}
+				consumers[i] = new TestSubpartitionConsumer(
+					false, new TestConsumerCallback.RecyclingCallback());
+
+				readers[i] = new SpilledSubpartitionView(
+					parent,
+					inputBuffers.getMemorySegmentSize(),
+					writers[i],
+					512 + 1, // +1 for end of partition event
+					consumers[i]);
+
+				consumers[i].setSubpartitionView(readers[i]);
 			}
 
-			final List<Future<Boolean>> results = new ArrayList<>();
+			final List<Future<Boolean>> results = Lists.newArrayList();
 
 			// Submit the consuming tasks
-			for (ResultSubpartitionView view : readers) {
-				results.add(executor.submit(new TestSubpartitionConsumer(
-						view, false, new RecyclingCallback())));
+			for (TestSubpartitionConsumer consumer : consumers) {
+				results.add(executor.submit(consumer));
 			}
 
 			// Wait for the results
 			for (Future<Boolean> res : results) {
 				try {
 					res.get(2, TimeUnit.MINUTES);
-				}
-				catch (TimeoutException e) {
+				} catch (TimeoutException e) {
 					throw new TimeoutException("There has been a timeout in the test. This " +
-							"indicates that there is a bug/deadlock in the tested subpartition " +
-							"view. The timed out test was in " + ioMode + " mode.");
+						"indicates that there is a bug/deadlock in the tested subpartition " +
+						"view.");
 				}
 			}
-		}
-		finally {
-			for (BufferFileWriter writer : writers) {
-				if (writer != null) {
-					writer.deleteChannel();
+		} finally {
+			if (writers != null) {
+				for (BufferFileWriter writer : writers) {
+					if (writer != null) {
+						writer.deleteChannel();
+					}
 				}
 			}
 
-			for (ResultSubpartitionView reader : readers) {
-				if (reader != null) {
-					reader.releaseAllResources();
+			if (readers != null) {
+				for (ResultSubpartitionView reader : readers) {
+					if (reader != null) {
+						reader.releaseAllResources();
+					}
 				}
 			}
+
+			if (executor != null) {
+				executor.shutdown();
+			}
 		}
 	}
 
@@ -163,9 +206,9 @@ public class SpilledSubpartitionViewTest {
 	 * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been written.
 	 */
 	static BufferFileWriter createWriterAndWriteBuffers(
-			IOManager ioManager,
-			BufferProvider bufferProvider,
-			int numberOfBuffers) throws IOException {
+		IOManager ioManager,
+		BufferProvider bufferProvider,
+		int numberOfBuffers) throws IOException {
 
 		final BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel());
 
@@ -177,4 +220,5 @@ public class SpilledSubpartitionViewTest {
 
 		return writer;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 26a8f29..14942bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -50,8 +49,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			subpartition.finish();
 
 			assertFalse(subpartition.add(mock(Buffer.class)));
-		}
-		finally {
+		} finally {
 			if (subpartition != null) {
 				subpartition.release();
 			}
@@ -66,8 +64,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			subpartition.release();
 
 			assertFalse(subpartition.add(mock(Buffer.class)));
-		}
-		finally {
+		} finally {
 			if (subpartition != null) {
 				subpartition.release();
 			}
@@ -97,7 +94,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
 
 		// Create the view
-		ResultSubpartitionView view = partition.createReadView(buffers);
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		ResultSubpartitionView view = partition.createReadView(buffers, listener);
 
 		// The added buffer and end-of-partition event
 		assertNotNull(view.getNextBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 8884b29..cd75a7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;
 
@@ -114,11 +113,11 @@ public class InputChannelTest {
 	private static class MockInputChannel extends InputChannel {
 
 		private MockInputChannel(
-				SingleInputGate inputGate,
-				int channelIndex,
-				ResultPartitionID partitionId,
-				int initialBackoff,
-				int maxBackoff) {
+			SingleInputGate inputGate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			int initialBackoff,
+			int maxBackoff) {
 
 			super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, new SimpleCounter());
 		}
@@ -128,7 +127,7 @@ public class InputChannelTest {
 		}
 
 		@Override
-		Buffer getNextBuffer() throws IOException, InterruptedException {
+		BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
 			return null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index cfbe99e..fa44393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -64,24 +64,25 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 		// The input iterator can produce an infinite stream. That's why we have to serialize each
 		// record on demand and cannot do it upfront.
-		final Answer<Buffer> answer = new Answer<Buffer>() {
+		final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() {
+
+			private boolean hasData = inputIterator.next(reuse) != null;
+
 			@Override
-			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-				if (inputIterator.next(reuse) != null) {
+			public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
+				if (hasData) {
 					final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
 					serializer.setNextBuffer(buffer);
 					serializer.addRecord(reuse);
 
-					inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+					hasData = inputIterator.next(reuse) != null;
 
 					// Call getCurrentBuffer to ensure size is set
-					return serializer.getCurrentBuffer();
-				}
-				else {
-
+					return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true);
+				} else {
 					when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
 
-					return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+					return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
 				}
 			}
 		};
@@ -93,8 +94,8 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 		return this;
 	}
 
-	public IteratorWrappingTestSingleInputGate<T> read() {
-		inputGate.onAvailableBuffer(inputChannel.getInputChannel());
+	public IteratorWrappingTestSingleInputGate<T> notifyNonEmpty() {
+		inputGate.notifyChannelNonEmpty(inputChannel.getInputChannel());
 
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 4ca1d1f..0b72f95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -57,7 +58,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -93,11 +93,11 @@ public class LocalInputChannelTest {
 		final ExecutorService executor = Executors.newFixedThreadPool(2 * parallelism);
 
 		final NetworkBufferPool networkBuffers = new NetworkBufferPool(
-				(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
-				TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
+			(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
+			TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
 
 		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
-				mock(ResultPartitionConsumableNotifier.class);
+			mock(ResultPartitionConsumableNotifier.class);
 
 		final TaskActions taskActions = mock(TaskActions.class);
 
@@ -124,21 +124,20 @@ public class LocalInputChannelTest {
 				partitionManager,
 				partitionConsumableNotifier,
 				ioManager,
-				ASYNC,
 				true);
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
-					networkBuffers.createBufferPool(producerBufferPoolSize, true));
+				networkBuffers.createBufferPool(producerBufferPoolSize, true));
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
-					partition,
-					false,
-					new TestPartitionProducerBufferSource(
-							parallelism,
-							partition.getBufferProvider(),
-							numberOfBuffersPerChannel)
+				partition,
+				false,
+				new TestPartitionProducerBufferSource(
+					parallelism,
+					partition.getBufferProvider(),
+					numberOfBuffersPerChannel)
 			);
 
 			// Register with the partition manager in order to allow the local input channels to
@@ -150,7 +149,7 @@ public class LocalInputChannelTest {
 		try {
 			// Submit producer tasks
 			List<Future<?>> results = Lists.newArrayListWithCapacity(
-					parallelism + 1);
+				parallelism + 1);
 
 			for (int i = 0; i < parallelism; i++) {
 				results.add(executor.submit(partitionProducers[i]));
@@ -159,14 +158,14 @@ public class LocalInputChannelTest {
 			// Submit consumer
 			for (int i = 0; i < parallelism; i++) {
 				results.add(executor.submit(
-						new TestLocalInputChannelConsumer(
-								i,
-								parallelism,
-								numberOfBuffersPerChannel,
-								networkBuffers.createBufferPool(parallelism, true),
-								partitionManager,
-								new TaskEventDispatcher(),
-								partitionIds)));
+					new TestLocalInputChannelConsumer(
+						i,
+						parallelism,
+						numberOfBuffersPerChannel,
+						networkBuffers.createBufferPool(parallelism, true),
+						partitionManager,
+						new TaskEventDispatcher(),
+						partitionIds)));
 			}
 
 			// Wait for all to finish
@@ -183,7 +182,7 @@ public class LocalInputChannelTest {
 	@Test
 	public void testPartitionRequestExponentialBackoff() throws Exception {
 		// Config
-		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+		Tuple2<Integer, Integer> backoff = new Tuple2<>(500, 3000);
 
 		// Start with initial backoff, then keep doubling, and cap at max.
 		int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
@@ -199,7 +198,7 @@ public class LocalInputChannelTest {
 		LocalInputChannel ch = createLocalInputChannel(inputGate, partitionManager, backoff);
 
 		when(partitionManager
-				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider)))
+				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class)))
 				.thenThrow(new PartitionNotFoundException(ch.partitionId));
 
 		Timer timer = mock(Timer.class);
@@ -215,7 +214,7 @@ public class LocalInputChannelTest {
 		// Initial request
 		ch.requestSubpartition(0);
 		verify(partitionManager)
-				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider));
+				.createSubpartitionView(eq(ch.partitionId), eq(0), eq(bufferProvider), any(BufferAvailabilityListener.class));
 
 		// Request subpartition and verify that the actual requests are delayed.
 		for (long expected : expectedDelays) {
@@ -236,14 +235,13 @@ public class LocalInputChannelTest {
 
 	@Test(expected = CancelTaskException.class)
 	public void testProducerFailedException() throws Exception {
-
 		ResultSubpartitionView view = mock(ResultSubpartitionView.class);
 		when(view.isReleased()).thenReturn(true);
 		when(view.getFailureCause()).thenReturn(new Exception("Expected test exception"));
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager
-				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)))
+				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class)))
 				.thenReturn(view);
 
 		SingleInputGate inputGate = mock(SingleInputGate.class);
@@ -251,7 +249,7 @@ public class LocalInputChannelTest {
 		when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
 
 		LocalInputChannel ch = createLocalInputChannel(
-				inputGate, partitionManager, new Tuple2<Integer, Integer>(0, 0));
+				inputGate, partitionManager, new Tuple2<>(0, 0));
 
 		ch.requestSubpartition(0);
 
@@ -268,14 +266,14 @@ public class LocalInputChannelTest {
 			throws IOException, InterruptedException {
 
 		return new LocalInputChannel(
-			inputGate,
-			0,
-			new ResultPartitionID(),
-			partitionManager,
-			mock(TaskEventDispatcher.class),
-			initialAndMaxRequestBackoff._1(),
-			initialAndMaxRequestBackoff._2(),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				inputGate,
+				0,
+				new ResultPartitionID(),
+				partitionManager,
+				mock(TaskEventDispatcher.class),
+				initialAndMaxRequestBackoff._1(),
+				initialAndMaxRequestBackoff._2(),
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 	}
 
 	/**
@@ -344,14 +342,14 @@ public class LocalInputChannelTest {
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
 			this.inputGate = new SingleInputGate(
-				"Test Name",
-				new JobID(),
-				new ExecutionAttemptID(),
-				new IntermediateDataSetID(),
-				subpartitionIndex,
-				numberOfInputChannels,
-				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					"Test Name",
+					new JobID(),
+					new ExecutionAttemptID(),
+					new IntermediateDataSetID(),
+					subpartitionIndex,
+					numberOfInputChannels,
+					mock(TaskActions.class),
+					new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 2c2f966..1d30a9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -70,7 +70,7 @@ public class RemoteInputChannelTest {
 
 		// Need to notify the input gate for the out-of-order buffer as well. Otherwise the
 		// receiving task will not notice the error.
-		verify(inputGate, times(2)).onAvailableBuffer(eq(inputChannel));
+		verify(inputGate, times(2)).notifyChannelNonEmpty(eq(inputChannel));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 0b7b10d..7cae362 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -72,18 +73,18 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
-				new TestInputChannel(inputGate, 0),
-				new TestInputChannel(inputGate, 1)
+			new TestInputChannel(inputGate, 0),
+			new TestInputChannel(inputGate, 1)
 		};
 
 		inputGate.setInputChannel(
-				new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
 
 		inputGate.setInputChannel(
-				new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+			new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -92,9 +93,12 @@ public class SingleInputGateTest {
 		inputChannels[1].readEndOfPartitionEvent();
 		inputChannels[0].readEndOfPartitionEvent();
 
-		verifyBufferOrEvent(inputGate, true, 0);
+		inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
+		inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
+
 		verifyBufferOrEvent(inputGate, true, 0);
 		verifyBufferOrEvent(inputGate, true, 1);
+		verifyBufferOrEvent(inputGate, true, 0);
 		verifyBufferOrEvent(inputGate, false, 1);
 		verifyBufferOrEvent(inputGate, false, 0);
 
@@ -111,10 +115,14 @@ public class SingleInputGateTest {
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
 		when(iterator.getNextBuffer()).thenReturn(
-				new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
+			new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
-		when(partitionManager.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class))).thenReturn(iterator);
+		when(partitionManager.createSubpartitionView(
+			any(ResultPartitionID.class),
+			anyInt(),
+			any(BufferProvider.class),
+			any(BufferAvailabilityListener.class))).thenReturn(iterator);
 
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
@@ -143,7 +151,7 @@ public class SingleInputGateTest {
 		inputGate.requestPartitions();
 
 		// Only the local channel can request
-		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
 
 		// Send event backwards and initialize unknown channel afterwards
 		final TaskEvent event = new TestTaskEvent();
@@ -155,7 +163,7 @@ public class SingleInputGateTest {
 		// After the update, the pending event should be send to local channel
 		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
 
-		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
 		verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 	}
 
@@ -174,8 +182,7 @@ public class SingleInputGateTest {
 			new IntermediateDataSetID(),
 			0,
 			1,
-			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -186,19 +193,17 @@ public class SingleInputGateTest {
 			partitionManager,
 			new TaskEventDispatcher(),
 			new LocalConnectionManager(),
-			0,
-			0,
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
 		// Update to a local channel and verify that no request is triggered
 		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
-				unknown.partitionId,
-				ResultPartitionLocation.createLocal()));
+			unknown.partitionId,
+			ResultPartitionLocation.createLocal()));
 
 		verify(partitionManager, never()).createSubpartitionView(
-				any(ResultPartitionID.class), anyInt(), any(BufferProvider.class));
+			any(ResultPartitionID.class), anyInt(), any(BufferProvider.class), any(BufferAvailabilityListener.class));
 	}
 
 	/**
@@ -227,8 +232,7 @@ public class SingleInputGateTest {
 			new ResultPartitionManager(),
 			new TaskEventDispatcher(),
 			new LocalConnectionManager(),
-			0,
-			0,
+			0, 0,
 			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
@@ -249,16 +253,15 @@ public class SingleInputGateTest {
 		// Wait for blocking queue poll call and release input gate
 		boolean success = false;
 		for (int i = 0; i < 50; i++) {
-			if (asyncConsumer != null && asyncConsumer.isAlive()) {
-				StackTraceElement[] stackTrace = asyncConsumer.getStackTrace();
-				success = isInBlockingQueuePoll(stackTrace);
+			if (asyncConsumer.isAlive()) {
+				success = asyncConsumer.getState() == Thread.State.WAITING;
 			}
 
 			if (success) {
 				break;
 			} else {
 				// Retry
-				Thread.sleep(500);
+				Thread.sleep(100);
 			}
 		}
 
@@ -355,33 +358,12 @@ public class SingleInputGateTest {
 		}
 	}
 
-	/**
-	 * Returns whether the stack trace represents a Thread in a blocking queue
-	 * poll call.
-	 *
-	 * @param stackTrace Stack trace of the Thread to check
-	 *
-	 * @return Flag indicating whether the Thread is in a blocking queue poll
-	 * call.
-	 */
-	private boolean isInBlockingQueuePoll(StackTraceElement[] stackTrace) {
-		for (StackTraceElement elem : stackTrace) {
-			if (elem.getMethodName().equals("poll") &&
-					elem.getClassName().equals("java.util.concurrent.LinkedBlockingQueue")) {
-
-				return true;
-			}
-		}
-
-		return false;
-	}
-
 	// ---------------------------------------------------------------------------------------------
 
 	static void verifyBufferOrEvent(
-			InputGate inputGate,
-			boolean isBuffer,
-			int channelIndex) throws IOException, InterruptedException {
+		InputGate inputGate,
+		boolean isBuffer,
+		int channelIndex) throws IOException, InterruptedException {
 
 		final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
 		assertEquals(isBuffer, boe.isBuffer());

http://git-wip-us.apache.org/repos/asf/flink/blob/d3ac0adf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 7ea67b3..a6597a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -46,7 +44,7 @@ public class TestInputChannel {
 	private final SingleInputGate inputGate;
 
 	// Abusing Mockito here... ;)
-	protected OngoingStubbing<Buffer> stubbing;
+	protected OngoingStubbing<InputChannel.BufferAndAvailability> stubbing;
 
 	public TestInputChannel(SingleInputGate inputGate, int channelIndex) {
 		checkArgument(channelIndex >= 0);
@@ -57,13 +55,10 @@ public class TestInputChannel {
 
 	public TestInputChannel read(Buffer buffer) throws IOException, InterruptedException {
 		if (stubbing == null) {
-			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+			stubbing = when(mock.getNextBuffer()).thenReturn(new InputChannel.BufferAndAvailability(buffer, true));
+		} else {
+			stubbing = stubbing.thenReturn(new InputChannel.BufferAndAvailability(buffer, true));
 		}
-		else {
-			stubbing = stubbing.thenReturn(buffer);
-		}
-
-		inputGate.onAvailableBuffer(mock);
 
 		return this;
 	}
@@ -75,34 +70,23 @@ public class TestInputChannel {
 		return read(buffer);
 	}
 
-	public TestInputChannel readEvent() throws IOException, InterruptedException {
-		return read(EventSerializer.toBuffer(new TestTaskEvent()));
-	}
-
-	public TestInputChannel readEndOfSuperstepEvent() throws IOException, InterruptedException {
-		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
-	}
-
 	public TestInputChannel readEndOfPartitionEvent() throws IOException, InterruptedException {
-		final Answer<Buffer> answer = new Answer<Buffer>() {
+		final Answer<InputChannel.BufferAndAvailability> answer = new Answer<InputChannel.BufferAndAvailability>() {
 			@Override
-			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+			public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
 				// Return true after finishing
 				when(mock.isReleased()).thenReturn(true);
 
-				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+				return new InputChannel.BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
 			}
 		};
 
 		if (stubbing == null) {
 			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
-		}
-		else {
+		} else {
 			stubbing = stubbing.thenAnswer(answer);
 		}
 
-		inputGate.onAvailableBuffer(mock);
-
 		return this;
 	}