You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:42:40 UTC

[6/6] flink git commit: [FLINK-5169] [network] Make consumption of InputChannels fair

[FLINK-5169] [network] Make consumption of InputChannels fair


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f728129b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f728129b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f728129b

Branch: refs/heads/master
Commit: f728129bdb8c3176fba03c3e74c65ed254146061
Parents: dbe7073
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 28 09:59:29 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:42:49 2016 +0100

----------------------------------------------------------------------
 .../io/network/api/reader/BufferReader.java     |  50 ---
 .../io/network/netty/PartitionRequestQueue.java | 255 +++++--------
 .../netty/PartitionRequestServerHandler.java    |  40 +-
 .../netty/SequenceNumberingViewReader.java      | 130 +++++++
 .../partition/BufferAvailabilityListener.java   |  33 ++
 .../partition/PipelinedSubpartition.java        | 148 ++++----
 .../partition/PipelinedSubpartitionView.java    |  18 +-
 .../io/network/partition/ResultPartition.java   |   9 +-
 .../partition/ResultPartitionManager.java       |   5 +-
 .../partition/ResultPartitionProvider.java      |   3 +-
 .../network/partition/ResultSubpartition.java   |   6 +-
 .../partition/ResultSubpartitionView.java       |   9 +-
 .../partition/SpillableSubpartition.java        | 179 ++++-----
 .../partition/SpillableSubpartitionView.java    | 210 ++++++-----
 .../partition/SpilledSubpartitionView.java      | 223 +++++++++++
 .../SpilledSubpartitionViewAsyncIO.java         | 377 -------------------
 .../SpilledSubpartitionViewSyncIO.java          | 196 ----------
 .../partition/consumer/BufferOrEvent.java       |  25 +-
 .../partition/consumer/InputChannel.java        |  43 ++-
 .../network/partition/consumer/InputGate.java   |   3 +-
 .../partition/consumer/InputGateListener.java   |  35 ++
 .../partition/consumer/LocalInputChannel.java   | 212 +++++------
 .../partition/consumer/RemoteInputChannel.java  |  95 +++--
 .../partition/consumer/SingleInputGate.java     | 144 ++++---
 .../partition/consumer/UnionInputGate.java      |  98 ++---
 .../partition/consumer/UnknownInputChannel.java |  11 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   1 -
 27 files changed, 1175 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
deleted file mode 100644
index ca59609..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.reader;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.io.IOException;
-
-/**
- * A buffer-oriented reader.
- */
-public final class BufferReader extends AbstractReader {
-
-	public BufferReader(InputGate gate) {
-		super(gate);
-	}
-
-	public Buffer getNextBuffer() throws IOException, InterruptedException {
-		while (true) {
-			final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
-
-			if (bufferOrEvent.isBuffer()) {
-				return bufferOrEvent.getBuffer();
-			}
-			else {
-				if (handleEvent(bufferOrEvent.getEvent())) {
-					return null;
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 094c9c7..dc80675 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -27,10 +27,10 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
-import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +39,10 @@ import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Set;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 
 /**
- * A queue of partition queues, which listens for channel writability changed
+ * A nonEmptyReader of partition queues, which listens for channel writability changed
  * events before writing and flushing {@link Buffer} instances.
  */
 class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
@@ -52,12 +51,10 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 	private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
 
-	private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque<SequenceNumberingSubpartitionView>();
+	private final Queue<SequenceNumberingViewReader> nonEmptyReader = new ArrayDeque<>();
 
 	private final Set<InputChannelID> released = Sets.newHashSet();
 
-	private SequenceNumberingSubpartitionView currentPartitionQueue;
-
 	private boolean fatalError;
 
 	private ChannelHandlerContext ctx;
@@ -71,8 +68,22 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		super.channelRegistered(ctx);
 	}
 
-	public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
-		ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
+	void notifyReaderNonEmpty(final SequenceNumberingViewReader reader) {
+		// The notification might come from the same thread. For the initial writes this
+		// might happen before the reader has set its reference to the view, because
+		// creating the queue and the initial notification happen in the same method call.
+		// This can be resolved by separating the creation of the view and allowing
+		// notifications.
+
+		// TODO This could potentially have a bad performance impact as in the
+		// worst case (network consumes faster than the producer) each buffer
+		// will trigger a separate event loop task being scheduled.
+		ctx.executor().execute(new Runnable() {
+			@Override
+			public void run() {
+				ctx.pipeline().fireUserEventTriggered(reader);
+			}
+		});
 	}
 
 	public void cancel(InputChannelID receiverId) {
@@ -87,45 +98,37 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 	@Override
 	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
-		if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
-			boolean triggerWrite = queue.isEmpty();
-
-			queue.add((SequenceNumberingSubpartitionView) msg);
-
+		// The user event triggered event loop callback is used for thread-safe
+		// hand over of reader queues and cancelled producers.
+
+		if (msg.getClass() == SequenceNumberingViewReader.class) {
+			// Queue a non-empty reader for consumption. If the queue
+			// is empty, we try trigger the actual write. Otherwise this
+			// will be handled by the writeAndFlushIfPossible calls.
+			boolean triggerWrite = nonEmptyReader.isEmpty();
+			nonEmptyReader.add((SequenceNumberingViewReader) msg);
 			if (triggerWrite) {
 				writeAndFlushNextMessageIfPossible(ctx.channel());
 			}
-		}
-		else if (msg.getClass() == InputChannelID.class) {
+		} else if (msg.getClass() == InputChannelID.class) {
+			// Release partition view that get a cancel request.
 			InputChannelID toCancel = (InputChannelID) msg;
-
 			if (released.contains(toCancel)) {
 				return;
 			}
 
 			// Cancel the request for the input channel
-			if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) {
-				currentPartitionQueue.releaseAllResources();
-				markAsReleased(currentPartitionQueue.receiverId);
-				currentPartitionQueue = null;
-			}
-			else {
-				int size = queue.size();
-
-				for (int i = 0; i < size; i++) {
-					SequenceNumberingSubpartitionView curr = queue.poll();
-
-					if (curr.getReceiverId().equals(toCancel)) {
-						curr.releaseAllResources();
-						markAsReleased(curr.receiverId);
-					}
-					else {
-						queue.add(curr);
-					}
+			int size = nonEmptyReader.size();
+			for (int i = 0; i < size; i++) {
+				SequenceNumberingViewReader reader = nonEmptyReader.poll();
+				if (reader.getReceiverId().equals(toCancel)) {
+					reader.releaseAllResources();
+					markAsReleased(reader.getReceiverId());
+				} else {
+					nonEmptyReader.add(reader);
 				}
 			}
-		}
-		else {
+		} else {
 			ctx.fireUserEventTriggered(msg);
 		}
 	}
@@ -140,64 +143,84 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 			return;
 		}
 
-		Buffer buffer = null;
+		// The logic here is very similar to the combined input gate and local
+		// input channel logic. You can think of this class acting as the input
+		// gate and the consumed views as the local input channels.
 
+		BufferAndAvailability next = null;
 		try {
 			if (channel.isWritable()) {
 				while (true) {
-					if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
+					SequenceNumberingViewReader reader = nonEmptyReader.poll();
+
+					// No queue with available data. We allow this here, because
+					// of the write callbacks that are executed after each write.
+					if (reader == null) {
 						return;
 					}
 
-					buffer = currentPartitionQueue.getNextBuffer();
+					next = reader.getNextBuffer();
 
-					if (buffer == null) {
-						if (currentPartitionQueue.registerListener(null)) {
-							currentPartitionQueue = null;
-						}
-						else if (currentPartitionQueue.isReleased()) {
-							markAsReleased(currentPartitionQueue.getReceiverId());
-
-							Throwable cause = currentPartitionQueue.getFailureCause();
+					if (next == null) {
+						if (reader.isReleased()) {
+							markAsReleased(reader.getReceiverId());
+							Throwable cause = reader.getFailureCause();
 
 							if (cause != null) {
-								ctx.writeAndFlush(new NettyMessage.ErrorResponse(
-										new ProducerFailedException(cause),
-										currentPartitionQueue.receiverId));
-							}
+								ErrorResponse msg = new ErrorResponse(
+									new ProducerFailedException(cause),
+									reader.getReceiverId());
 
-							currentPartitionQueue = null;
+								ctx.writeAndFlush(msg);
+							}
+						} else {
+							IllegalStateException err = new IllegalStateException(
+								"Bug in Netty consumer logic: reader queue got notified by partition " +
+									"about available data, but none was available.");
+							handleException(ctx.channel(), err);
+							return;
+						}
+					} else {
+						// this channel was now removed from the non-empty reader queue
+						// we re-add it in case it has more data, because in that case no
+						// "non-empty" notification will come for that reader from the queue.
+						if (next.moreAvailable()) {
+							nonEmptyReader.add(reader);
 						}
-					}
-					else {
-						BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
 
-						if (!buffer.isBuffer() &&
-								EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
+						BufferResponse msg = new BufferResponse(
+							next.buffer(),
+							reader.getSequenceNumber(),
+							reader.getReceiverId());
 
-							currentPartitionQueue.notifySubpartitionConsumed();
-							currentPartitionQueue.releaseAllResources();
-							markAsReleased(currentPartitionQueue.getReceiverId());
+						if (isEndOfPartitionEvent(next.buffer())) {
+							reader.notifySubpartitionConsumed();
+							reader.releaseAllResources();
 
-							currentPartitionQueue = null;
+							markAsReleased(reader.getReceiverId());
 						}
 
-						channel.writeAndFlush(resp).addListener(writeListener);
+						// Write and flush and wait until this is done before
+						// trying to continue with the next buffer.
+						channel.writeAndFlush(msg).addListener(writeListener);
 
 						return;
 					}
 				}
 			}
-		}
-		catch (Throwable t) {
-			if (buffer != null) {
-				buffer.recycle();
+		} catch (Throwable t) {
+			if (next != null) {
+				next.buffer().recycle();
 			}
 
 			throw new IOException(t.getMessage(), t);
 		}
 	}
 
+	private boolean isEndOfPartitionEvent(Buffer buffer) throws IOException {
+		return !buffer.isBuffer() && EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class;
+	}
+
 	@Override
 	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 		releaseAllResources();
@@ -215,22 +238,15 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		releaseAllResources();
 
 		if (channel.isActive()) {
-			channel.writeAndFlush(new NettyMessage.ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
+			channel.writeAndFlush(new ErrorResponse(cause)).addListener(ChannelFutureListener.CLOSE);
 		}
 	}
 
 	private void releaseAllResources() throws IOException {
-		if (currentPartitionQueue != null) {
-			currentPartitionQueue.releaseAllResources();
-			markAsReleased(currentPartitionQueue.getReceiverId());
-
-			currentPartitionQueue = null;
-		}
-
-		while ((currentPartitionQueue = queue.poll()) != null) {
-			currentPartitionQueue.releaseAllResources();
-
-			markAsReleased(currentPartitionQueue.getReceiverId());
+		SequenceNumberingViewReader reader;
+		while ((reader = nonEmptyReader.poll()) != null) {
+			reader.releaseAllResources();
+			markAsReleased(reader.getReceiverId());
 		}
 	}
 
@@ -241,7 +257,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		released.add(receiverId);
 	}
 
-	// This listener is called after an element of the current queue has been
+	// This listener is called after an element of the current nonEmptyReader has been
 	// flushed. If successful, the listener triggers further processing of the
 	// queues.
 	private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
@@ -251,87 +267,14 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 			try {
 				if (future.isSuccess()) {
 					writeAndFlushNextMessageIfPossible(future.channel());
-				}
-				else if (future.cause() != null) {
+				} else if (future.cause() != null) {
 					handleException(future.channel(), future.cause());
-				}
-				else {
+				} else {
 					handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
 				}
-			}
-			catch (Throwable t) {
+			} catch (Throwable t) {
 				handleException(future.channel(), t);
 			}
 		}
 	}
-
-	/**
-	 * Simple wrapper for the partition queue iterator, which increments a
-	 * sequence number for each returned buffer and remembers the receiver ID.
-	 */
-	private class SequenceNumberingSubpartitionView implements ResultSubpartitionView, NotificationListener {
-
-		private final ResultSubpartitionView queueIterator;
-
-		private final InputChannelID receiverId;
-
-		private int sequenceNumber = -1;
-
-		private SequenceNumberingSubpartitionView(ResultSubpartitionView queueIterator, InputChannelID receiverId) {
-			this.queueIterator = checkNotNull(queueIterator);
-			this.receiverId = checkNotNull(receiverId);
-		}
-
-		private InputChannelID getReceiverId() {
-			return receiverId;
-		}
-
-		private int getSequenceNumber() {
-			return sequenceNumber;
-		}
-
-		@Override
-		public Buffer getNextBuffer() throws IOException, InterruptedException {
-			Buffer buffer = queueIterator.getNextBuffer();
-
-			if (buffer != null) {
-				sequenceNumber++;
-			}
-
-			return buffer;
-		}
-
-		@Override
-		public void notifySubpartitionConsumed() throws IOException {
-			queueIterator.notifySubpartitionConsumed();
-		}
-
-		@Override
-		public boolean isReleased() {
-			return queueIterator.isReleased();
-		}
-
-		@Override
-		public Throwable getFailureCause() {
-			return queueIterator.getFailureCause();
-		}
-
-		@Override
-		public boolean registerListener(NotificationListener ignored) throws IOException {
-			return queueIterator.registerListener(this);
-		}
-
-		@Override
-		public void releaseAllResources() throws IOException {
-			queueIterator.releaseAllResources();
-		}
-
-		/**
-		 * Enqueue this iterator again after a notification.
-		 */
-		@Override
-		public void onNotification() {
-			ctx.pipeline().fireUserEventTriggered(this);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index e278d07..12b52ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionReq
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,10 +52,10 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 	private BufferPool bufferPool;
 
 	PartitionRequestServerHandler(
-			ResultPartitionProvider partitionProvider,
-			TaskEventDispatcher taskEventDispatcher,
-			PartitionRequestQueue outboundQueue,
-			NetworkBufferPool networkBufferPool) {
+		ResultPartitionProvider partitionProvider,
+		TaskEventDispatcher taskEventDispatcher,
+		PartitionRequestQueue outboundQueue,
+		NetworkBufferPool networkBufferPool) {
 
 		this.partitionProvider = partitionProvider;
 		this.taskEventDispatcher = taskEventDispatcher;
@@ -94,15 +93,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
 
 				try {
-					ResultSubpartitionView subpartition =
-							partitionProvider.createSubpartitionView(
-									request.partitionId,
-									request.queueIndex,
-									bufferPool);
-
-					outboundQueue.enqueue(subpartition, request.receiverId);
-				}
-				catch (PartitionNotFoundException notFound) {
+					SequenceNumberingViewReader reader = new SequenceNumberingViewReader(
+						request.receiverId,
+						outboundQueue);
+
+					reader.requestSubpartitionView(
+						partitionProvider,
+						request.partitionId,
+						request.queueIndex,
+						bufferPool);
+				} catch (PartitionNotFoundException notFound) {
 					respondWithError(ctx, notFound, request.receiverId);
 				}
 			}
@@ -115,20 +115,16 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 				if (!taskEventDispatcher.publish(request.partitionId, request.event)) {
 					respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
 				}
-			}
-			else if (msgClazz == CancelPartitionRequest.class) {
+			} else if (msgClazz == CancelPartitionRequest.class) {
 				CancelPartitionRequest request = (CancelPartitionRequest) msg;
 
 				outboundQueue.cancel(request.receiverId);
-			}
-			else if (msgClazz == CloseRequest.class) {
+			} else if (msgClazz == CloseRequest.class) {
 				outboundQueue.close();
-			}
-			else {
+			} else {
 				LOG.warn("Received unexpected client request: {}", msg);
 			}
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			respondWithError(ctx, t);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
new file mode 100644
index 0000000..ef611eb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple wrapper for the partition readerQueue iterator, which increments a
+ * sequence number for each returned buffer and remembers the receiver ID.
+ *
+ * <p>It also keeps track of available buffers and notifies the outbound
+ * handler about non-emptiness, similar to the {@link LocalInputChannel}.
+ */
+class SequenceNumberingViewReader implements BufferAvailabilityListener {
+
+	private final Object requestLock = new Object();
+
+	private final InputChannelID receiverId;
+
+	private final AtomicLong numBuffersAvailable = new AtomicLong();
+
+	private final PartitionRequestQueue requestQueue;
+
+	private volatile ResultSubpartitionView subpartitionView;
+
+	private int sequenceNumber = -1;
+
+	SequenceNumberingViewReader(InputChannelID receiverId, PartitionRequestQueue requestQueue) {
+		this.receiverId = receiverId;
+		this.requestQueue = requestQueue;
+	}
+
+	void requestSubpartitionView(
+		ResultPartitionProvider partitionProvider,
+		ResultPartitionID resultPartitionId,
+		int subPartitionIndex,
+		BufferProvider bufferProvider) throws IOException {
+
+		synchronized (requestLock) {
+			if (subpartitionView == null) {
+				// This this call can trigger a notification we have to
+				// schedule a separate task at the event loop that will
+				// start consuming this. Otherwise the reference to the
+				// view cannot be available in getNextBuffer().
+				this.subpartitionView = partitionProvider.createSubpartitionView(
+					resultPartitionId,
+					subPartitionIndex,
+					bufferProvider,
+					this);
+			} else {
+				throw new IllegalStateException("Subpartition already requested");
+			}
+		}
+	}
+
+	InputChannelID getReceiverId() {
+		return receiverId;
+	}
+
+	int getSequenceNumber() {
+		return sequenceNumber;
+	}
+
+	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
+		Buffer next = subpartitionView.getNextBuffer();
+		if (next != null) {
+			long remaining = numBuffersAvailable.decrementAndGet();
+			sequenceNumber++;
+
+			if (remaining >= 0) {
+				return new BufferAndAvailability(next, remaining > 0);
+			} else {
+				throw new IllegalStateException("no buffer available");
+			}
+		} else {
+			return null;
+		}
+	}
+
+	public void notifySubpartitionConsumed() throws IOException {
+		subpartitionView.notifySubpartitionConsumed();
+	}
+
+	public boolean isReleased() {
+		return subpartitionView.isReleased();
+	}
+
+	public Throwable getFailureCause() {
+		return subpartitionView.getFailureCause();
+	}
+
+	public void releaseAllResources() throws IOException {
+		subpartitionView.releaseAllResources();
+	}
+
+	@Override
+	public void notifyBuffersAvailable(long numBuffers) {
+		// if this request made the channel non-empty, notify the input gate
+		if (numBuffers > 0 && numBuffersAvailable.getAndAdd(numBuffers) == 0) {
+			requestQueue.notifyReaderNonEmpty(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
new file mode 100644
index 0000000..114ef7c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+/**
+ * Listener interface implemented by consumers of {@link ResultSubpartitionView}
+ * that want to be notified of availability of further buffers.
+ */
+public interface BufferAvailabilityListener {
+
+	/**
+	 * Called whenever a new number of buffers becomes available.
+	 *
+	 * @param numBuffers The number of buffers that became available.
+	 */
+	void notifyBuffersAvailable(long numBuffers);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 3981a26..e9400f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
@@ -38,51 +39,47 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PipelinedSubpartition.class);
 
+	// ------------------------------------------------------------------------
+
+	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+	private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
+
+	/** The read view to consume this subpartition. */
+	private PipelinedSubpartitionView readView;
+
 	/** Flag indicating whether the subpartition has been finished. */
 	private boolean isFinished;
 
 	/** Flag indicating whether the subpartition has been released. */
 	private volatile boolean isReleased;
 
-	/**
-	 * A data availability listener. Registered, when the consuming task is faster than the
-	 * producing task.
-	 */
-	private NotificationListener registeredListener;
-
-	/** The read view to consume this subpartition. */
-	private PipelinedSubpartitionView readView;
-
-	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
-	final ArrayDeque<Buffer> buffers = new ArrayDeque<Buffer>();
+	// ------------------------------------------------------------------------
 
 	PipelinedSubpartition(int index, ResultPartition parent) {
 		super(index, parent);
 	}
 
 	@Override
-	public boolean add(Buffer buffer) {
+	public boolean add(Buffer buffer) throws IOException {
 		checkNotNull(buffer);
 
-		final NotificationListener listener;
+		// view reference accessible outside the lock, but assigned inside the locked scope
+		final PipelinedSubpartitionView reader;
 
 		synchronized (buffers) {
-			if (isReleased || isFinished) {
+			if (isFinished || isReleased) {
 				return false;
 			}
 
 			// Add the buffer and update the stats
 			buffers.add(buffer);
+			reader = readView;
 			updateStatistics(buffer);
-
-			// Get the listener...
-			listener = registeredListener;
-			registeredListener = null;
 		}
 
 		// Notify the listener outside of the synchronized block
-		if (listener != null) {
-			listener.onNotification();
+		if (reader != null) {
+			reader.notifyBuffersAvailable(1);
 		}
 
 		return true;
@@ -90,36 +87,34 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public void finish() throws IOException {
-		final NotificationListener listener;
+		final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+		// view reference accessible outside the lock, but assigned inside the locked scope
+		final PipelinedSubpartitionView reader;
 
 		synchronized (buffers) {
-			if (isReleased || isFinished) {
+			if (isFinished || isReleased) {
 				return;
 			}
 
-			final Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
 			buffers.add(buffer);
+			reader = readView;
 			updateStatistics(buffer);
 
 			isFinished = true;
-
-			LOG.debug("Finished {}.", this);
-
-			// Get the listener...
-			listener = registeredListener;
-			registeredListener = null;
 		}
 
+		LOG.debug("Finished {}.", this);
+
 		// Notify the listener outside of the synchronized block
-		if (listener != null) {
-			listener.onNotification();
+		if (reader != null) {
+			reader.notifyBuffersAvailable(1);
 		}
 	}
 
 	@Override
 	public void release() {
-		final NotificationListener listener;
+		// view reference accessible outside the lock, but assigned inside the locked scope
 		final PipelinedSubpartitionView view;
 
 		synchronized (buffers) {
@@ -130,40 +125,35 @@ class PipelinedSubpartition extends ResultSubpartition {
 			// Release all available buffers
 			Buffer buffer;
 			while ((buffer = buffers.poll()) != null) {
-				if (!buffer.isRecycled()) {
-					buffer.recycle();
-				}
+				buffer.recycle();
 			}
 
 			// Get the view...
 			view = readView;
 			readView = null;
 
-			// Get the listener...
-			listener = registeredListener;
-			registeredListener = null;
-
 			// Make sure that no further buffers are added to the subpartition
 			isReleased = true;
-
-			LOG.debug("Released {}.", this);
 		}
 
+		LOG.debug("Released {}.", this);
+
 		// Release all resources of the view
 		if (view != null) {
 			view.releaseAllResources();
 		}
+	}
 
-		// Notify the listener outside of the synchronized block
-		if (listener != null) {
-			listener.onNotification();
+	Buffer pollBuffer() {
+		synchronized (buffers) {
+			return buffers.pollFirst();
 		}
 	}
 
 	@Override
 	public int releaseMemory() {
-		// The pipelined subpartition does not react to memory release requests. The buffers will be
-		// recycled by the consuming task.
+		// The pipelined subpartition does not react to memory release requests.
+		// The buffers will be recycled by the consuming task.
 		return 0;
 	}
 
@@ -173,53 +163,43 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
-		synchronized (buffers) {
-			if (readView != null) {
-				throw new IllegalStateException("Subpartition " + index + " of "
-						+ parent.getPartitionId() + " is being or already has been " +
-						"consumed, but pipelined subpartitions can only be consumed once.");
-			}
+	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+		final int queueSize;
 
-			readView = new PipelinedSubpartitionView(this);
+		synchronized (buffers) {
+			checkState(!isReleased);
+			checkState(readView == null,
+					"Subpartition %s of is being (or already has been) consumed, " +
+					"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
 
-			LOG.debug("Created read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+			LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
 
-			return readView;
+			queueSize = buffers.size();
+			readView = new PipelinedSubpartitionView(this, availabilityListener);
 		}
+
+		readView.notifyBuffersAvailable(queueSize);
+
+		return readView;
 	}
 
 	@Override
 	public String toString() {
-		synchronized (buffers) {
-			return String.format("PipelinedSubpartition [number of buffers: %d (%d bytes), " +
-							"finished? %s, read view? %s]",
-					getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null);
-		}
-	}
+		final long numBuffers;
+		final long numBytes;
+		final boolean finished;
+		final boolean hasReadView;
 
-	/**
-	 * Registers a listener with this subpartition and returns whether the registration was
-	 * successful.
-	 *
-	 * <p> A registered listener is notified when the state of the subpartition changes. After a
-	 * notification, the listener is unregistered. Only a single listener is allowed to be
-	 * registered.
-	 */
-	boolean registerListener(NotificationListener listener) {
 		synchronized (buffers) {
-			if (!buffers.isEmpty() || isReleased) {
-				return false;
-			}
-
-			if (registeredListener == null) {
-				registeredListener = listener;
-
-				return true;
-			}
-
-			throw new IllegalStateException("Already registered listener.");
+			numBuffers = getTotalNumberOfBuffers();
+			numBytes = getTotalNumberOfBytes();
+			finished = isFinished;
+			hasReadView = readView != null;
 		}
+
+		return String.format(
+				"PipelinedSubpartition [number of buffers: %d (%d bytes), finished? %s, read view? %s]",
+				numBuffers, numBytes, finished, hasReadView);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index f8d81a4..52c78ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -33,23 +33,25 @@ class PipelinedSubpartitionView implements ResultSubpartitionView {
 	/** The subpartition this view belongs to. */
 	private final PipelinedSubpartition parent;
 
+	private final BufferAvailabilityListener availabilityListener;
+
 	/** Flag indicating whether this view has been released. */
-	private AtomicBoolean isReleased = new AtomicBoolean();
+	private final AtomicBoolean isReleased;
 
-	PipelinedSubpartitionView(PipelinedSubpartition parent) {
+	PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) {
 		this.parent = checkNotNull(parent);
+		this.availabilityListener = checkNotNull(listener);
+		this.isReleased = new AtomicBoolean();
 	}
 
 	@Override
 	public Buffer getNextBuffer() {
-		synchronized (parent.buffers) {
-			return parent.buffers.poll();
-		}
+		return parent.pollBuffer();
 	}
 
 	@Override
-	public boolean registerListener(NotificationListener listener) {
-		return !isReleased.get() && parent.registerListener(listener);
+	public void notifyBuffersAvailable(long numBuffers) throws IOException {
+		availabilityListener.notifyBuffersAvailable(numBuffers);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 834318c..474c25c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -135,7 +134,6 @@ public class ResultPartition implements BufferPoolOwner {
 		ResultPartitionManager partitionManager,
 		ResultPartitionConsumableNotifier partitionConsumableNotifier,
 		IOManager ioManager,
-		IOMode defaultIoMode,
 		boolean sendScheduleOrUpdateConsumersMessage) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
@@ -152,8 +150,7 @@ public class ResultPartition implements BufferPoolOwner {
 		switch (partitionType) {
 			case BLOCKING:
 				for (int i = 0; i < subpartitions.length; i++) {
-					subpartitions[i] = new SpillableSubpartition(
-							i, this, ioManager, defaultIoMode);
+					subpartitions[i] = new SpillableSubpartition(i, this, ioManager);
 				}
 
 				break;
@@ -340,7 +337,7 @@ public class ResultPartition implements BufferPoolOwner {
 	/**
 	 * Returns the requested subpartition.
 	 */
-	public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider) throws IOException {
+	public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
 		int refCnt = pendingReferences.get();
 
 		checkState(refCnt != -1, "Partition released.");
@@ -348,7 +345,7 @@ public class ResultPartition implements BufferPoolOwner {
 
 		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
 
-		ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider);
+		ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
 
 		LOG.debug("Created {}", readView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 9da3e14..8ad3e34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -66,7 +66,8 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 	public ResultSubpartitionView createSubpartitionView(
 			ResultPartitionID partitionId,
 			int subpartitionIndex,
-			BufferProvider bufferProvider) throws IOException {
+			BufferProvider bufferProvider,
+			BufferAvailabilityListener availabilityListener) throws IOException {
 
 		synchronized (registeredPartitions) {
 			final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
@@ -78,7 +79,7 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 
 			LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
 
-			return partition.createSubpartitionView(subpartitionIndex, bufferProvider);
+			return partition.createSubpartitionView(subpartitionIndex, bufferProvider, availabilityListener);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
index 23dd1d3..3fbfd49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionProvider.java
@@ -30,6 +30,7 @@ public interface ResultPartitionProvider {
 	ResultSubpartitionView createSubpartitionView(
 			ResultPartitionID partitionId,
 			int index,
-			BufferProvider bufferProvider) throws IOException;
+			BufferProvider bufferProvider,
+			BufferAvailabilityListener availabilityListener) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 31c8f73..dd0e152 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -37,7 +37,7 @@ public abstract class ResultSubpartition {
 	// - Statistics ----------------------------------------------------------
 
 	/** The total number of buffers (both data and event buffers) */
-	private int totalNumberOfBuffers;
+	private long totalNumberOfBuffers;
 
 	/** The total number of bytes (both data and event buffers) */
 	private long totalNumberOfBytes;
@@ -52,7 +52,7 @@ public abstract class ResultSubpartition {
 		totalNumberOfBytes += buffer.getSize();
 	}
 
-	protected int getTotalNumberOfBuffers() {
+	protected long getTotalNumberOfBuffers() {
 		return totalNumberOfBuffers;
 	}
 
@@ -77,7 +77,7 @@ public abstract class ResultSubpartition {
 
 	abstract public void release() throws IOException;
 
-	abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException;
+	abstract public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException;
 
 	abstract int releaseMemory() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index cfc5455..98be90f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
 import java.io.IOException;
 
@@ -41,13 +40,7 @@ public interface ResultSubpartitionView {
 	 */
 	Buffer getNextBuffer() throws IOException, InterruptedException;
 
-	/**
-	 * Subscribes to data availability notifications.
-	 * <p>
-	 * Returns whether the subscription was successful. A subscription fails,
-	 * if there is data available.
-	 */
-	boolean registerListener(NotificationListener listener) throws IOException;
+	void notifyBuffersAvailable(long buffers) throws IOException;
 
 	void releaseAllResources() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 3f19559..439e08d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,42 +18,54 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A blocking in-memory subpartition, which is able to spill to disk.
+ * A spillable sub partition starts out in-memory and spills to disk if asked
+ * to do so.
  *
- * <p> Buffers are kept in-memory as long as possible. If not possible anymore, all buffers are
- * spilled to disk.
+ * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool
+ * is also responsible to trigger the release of the buffers if it needs them
+ * back. At this point, the spillable sub partition will write all in-memory
+ * buffers to disk. All added buffers after that point directly go to disk.
+ *
+ * <p>This partition type is used for {@link ResultPartitionType#BLOCKING}
+ * results that are fully produced before they can be consumed. At the point
+ * when they are consumed, the buffers are (i) all in-memory, (ii) currently
+ * being spilled to disk, or (iii) completely spilled to disk. Depending on
+ * this state, different reader variants are returned (see
+ * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
+ *
+ * <p>Since the network buffer pool size is usually quite small (default is
+ * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
+ * spillable partitions will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartition.class);
 
-	/** All buffers of this subpartition. */
-	final ArrayList<Buffer> buffers = new ArrayList<Buffer>();
+	/** Buffers are kept in this queue as long as we weren't ask to release any. */
+	private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
 
-	/** The I/O manager to create the spill writer from. */
-	final IOManager ioManager;
-
-	/** The default I/O mode to use. */
-	final IOMode ioMode;
+	/** The I/O manager used for spilling buffers to disk. */
+	private final IOManager ioManager;
 
 	/** The writer used for spilling. As long as this is null, we are in-memory. */
-	BufferFileWriter spillWriter;
+	private BufferFileWriter spillWriter;
 
 	/** Flag indicating whether the subpartition has been finished. */
 	private boolean isFinished;
@@ -64,11 +76,10 @@ class SpillableSubpartition extends ResultSubpartition {
 	/** The read view to consume this subpartition. */
 	private ResultSubpartitionView readView;
 
-	SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager, IOMode ioMode) {
+	SpillableSubpartition(int index, ResultPartition parent, IOManager ioManager) {
 		super(index, parent);
 
 		this.ioManager = checkNotNull(ioManager);
-		this.ioMode = checkNotNull(ioMode);
 	}
 
 	@Override
@@ -80,7 +91,6 @@ class SpillableSubpartition extends ResultSubpartition {
 				return false;
 			}
 
-			// In-memory
 			if (spillWriter == null) {
 				buffers.add(buffer);
 
@@ -88,7 +98,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			}
 		}
 
-		// Else: Spilling
+		// Didn't return early => go to disk
 		spillWriter.writeBlock(buffer);
 
 		return true;
@@ -102,7 +112,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			}
 		}
 
-		// If we are spilling/have spilled, wait for the writer to finish.
+		// If we are spilling/have spilled, wait for the writer to finish
 		if (spillWriter != null) {
 			spillWriter.close();
 		}
@@ -117,51 +127,93 @@ class SpillableSubpartition extends ResultSubpartition {
 				return;
 			}
 
-			// Recycle all in-memory buffers
-			for (Buffer buffer : buffers) {
-				buffer.recycle();
-			}
-
-			buffers.clear();
-			buffers.trimToSize();
+			view = readView;
 
-			// If we are spilling/have spilled, wait for the writer to finish and delete the file.
-			if (spillWriter != null) {
-				spillWriter.closeAndDelete();
+			// No consumer yet, we are responsible to clean everything up. If
+			// one is available, the view is responsible is to clean up (see
+			// below).
+			if (view == null) {
+				for (Buffer buffer : buffers) {
+					buffer.recycle();
+				}
+				buffers.clear();
+
+				// TODO This can block until all buffers are written out to
+				// disk if a spill is in-progress before deleting the file.
+				// It is possibly called from the Netty event loop threads,
+				// which can bring down the network.
+				if (spillWriter != null) {
+					spillWriter.closeAndDelete();
+				}
 			}
 
-			// Get the view...
-			view = readView;
-			readView = null;
-
 			isReleased = true;
 		}
 
-		// Release the view outside of the synchronized block
 		if (view != null) {
-			view.notifySubpartitionConsumed();
+			view.releaseAllResources();
+		}
+	}
+
+	@Override
+	public ResultSubpartitionView createReadView(BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
+		synchronized (buffers) {
+			if (!isFinished) {
+				throw new IllegalStateException("Subpartition has not been finished yet, " +
+					"but blocking subpartitions can only be consumed after they have " +
+					"been finished.");
+			}
+
+			if (readView != null) {
+				throw new IllegalStateException("Subpartition is being or already has been " +
+					"consumed, but we currently allow subpartitions to only be consumed once.");
+			}
+
+			if (spillWriter != null) {
+				readView = new SpilledSubpartitionView(
+					this,
+					bufferProvider.getMemorySegmentSize(),
+					spillWriter,
+					getTotalNumberOfBuffers(),
+					availabilityListener);
+			} else {
+				readView = new SpillableSubpartitionView(
+					this,
+					buffers,
+					ioManager,
+					bufferProvider.getMemorySegmentSize(),
+					availabilityListener);
+			}
+
+			return readView;
 		}
 	}
 
 	@Override
 	public int releaseMemory() throws IOException {
 		synchronized (buffers) {
-			if (spillWriter == null) {
-				// Create the spill writer
+			ResultSubpartitionView view = readView;
+
+			if (view != null && view.getClass() == SpillableSubpartitionView.class) {
+				// If there is a spilalble view, it's the responsibility of the
+				// view to release memory.
+				SpillableSubpartitionView spillableView = (SpillableSubpartitionView) view;
+				return spillableView.releaseMemory();
+			} else if (spillWriter == null) {
+				// No view and in-memory => spill to disk
 				spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
 
-				final int numberOfBuffers = buffers.size();
-
+				int numberOfBuffers = buffers.size();
 				long spilledBytes = 0;
 
 				// Spill all buffers
 				for (int i = 0; i < numberOfBuffers; i++) {
-					Buffer buffer = buffers.remove(0);
+					Buffer buffer = buffers.remove();
 					spilledBytes += buffer.getSize();
 					spillWriter.writeBlock(buffer);
 				}
 
-				LOG.debug("Spilled {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
+				LOG.debug("Spilling {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
 
 				return numberOfBuffers;
 			}
@@ -177,47 +229,8 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
-		synchronized (buffers) {
-			if (!isFinished) {
-				throw new IllegalStateException("Subpartition has not been finished yet, " +
-						"but blocking subpartitions can only be consumed after they have " +
-						"been finished.");
-			}
-
-			if (readView != null) {
-				throw new IllegalStateException("Subpartition is being or already has been " +
-						"consumed, but we currently allow subpartitions to only be consumed once.");
-			}
-
-			// Spilled if closed and no outstanding write requests
-			boolean isSpilled = spillWriter != null && (spillWriter.isClosed()
-					|| spillWriter.getNumberOfOutstandingRequests() == 0);
-
-			if (isSpilled) {
-				if (ioMode.isSynchronous()) {
-					readView = new SpilledSubpartitionViewSyncIO(
-							this,
-							bufferProvider.getMemorySegmentSize(),
-							spillWriter.getChannelID(),
-							0);
-				}
-				else {
-					readView = new SpilledSubpartitionViewAsyncIO(
-							this,
-							bufferProvider,
-							ioManager,
-							spillWriter.getChannelID(),
-							0);
-				}
-			}
-			else {
-				readView = new SpillableSubpartitionView(
-						this, bufferProvider, buffers.size(), ioMode);
-			}
-
-			return readView;
-		}
+	public int getNumberOfQueuedBuffers() {
+		return buffers.size();
 	}
 
 	@Override
@@ -228,8 +241,4 @@ class SpillableSubpartition extends ResultSubpartition {
 				spillWriter != null);
 	}
 
-	@Override
-	public int getNumberOfQueuedBuffers() {
-			return buffers.size();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 29c2002..8119ecc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.NotificationListener;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 class SpillableSubpartitionView implements ResultSubpartitionView {
@@ -34,146 +33,163 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 	/** The subpartition this view belongs to. */
 	private final SpillableSubpartition parent;
 
-	/** The buffer provider to read buffers into (spilling case). */
-	private final BufferProvider bufferProvider;
+	/** All buffers of this subpartition. Access to the buffers is synchronized on this object. */
+	private final ArrayDeque<Buffer> buffers;
 
-	/** The number of buffers in-memory at the subpartition. */
-	private final int numberOfBuffers;
+	/** IO manager if we need to spill (for spilled case). */
+	private final IOManager ioManager;
 
-	/** The default I/O mode to use. */
-	private final IOMode ioMode;
+	/** Size of memory segments (for spilled case). */
+	private final int memorySegmentSize;
 
-	private ResultSubpartitionView spilledView;
-
-	private int currentQueuePosition;
-
-	private long currentBytesRead;
+	/**
+	 * The buffer availability listener. As long as in-memory, notifications
+	 * happen on a buffer per buffer basis as spilling may happen after a
+	 * notification has been sent out.
+	 */
+	private final BufferAvailabilityListener listener;
 
 	private final AtomicBoolean isReleased = new AtomicBoolean(false);
 
-	public SpillableSubpartitionView(
-			SpillableSubpartition parent,
-			BufferProvider bufferProvider,
-			int numberOfBuffers,
-			IOMode ioMode) {
-
-		this.parent = checkNotNull(parent);
-		this.bufferProvider = checkNotNull(bufferProvider);
-		checkArgument(numberOfBuffers >= 0);
-		this.numberOfBuffers = numberOfBuffers;
-		this.ioMode = checkNotNull(ioMode);
-	}
-
-	@Override
-	public Buffer getNextBuffer() throws IOException, InterruptedException {
-
-		if (isReleased.get()) {
-			return null;
-		}
-
-		// 1) In-memory
-		synchronized (parent.buffers) {
-			if (parent.isReleased()) {
-				return null;
-			}
+	/**
+	 * The next buffer to hand out. Everytime this is set to a non-null value,
+	 * a listener notification happens.
+	 */
+	private Buffer nextBuffer;
 
-			if (parent.spillWriter == null) {
-				if (currentQueuePosition < numberOfBuffers) {
-					Buffer buffer = parent.buffers.get(currentQueuePosition);
+	private volatile SpilledSubpartitionView spilledView;
 
-					buffer.retain();
+	SpillableSubpartitionView(
+		SpillableSubpartition parent,
+		ArrayDeque<Buffer> buffers,
+		IOManager ioManager,
+		int memorySegmentSize,
+		BufferAvailabilityListener listener) {
 
-					// TODO Fix hard coding of 8 bytes for the header
-					currentBytesRead += buffer.getSize() + 8;
-					currentQueuePosition++;
-
-					return buffer;
-				}
+		this.parent = checkNotNull(parent);
+		this.buffers = checkNotNull(buffers);
+		this.ioManager = checkNotNull(ioManager);
+		this.memorySegmentSize = memorySegmentSize;
+		this.listener = checkNotNull(listener);
 
-				return null;
-			}
+		synchronized (buffers) {
+			nextBuffer = buffers.poll();
 		}
 
-		// 2) Spilled
-		if (spilledView != null) {
-			return spilledView.getNextBuffer();
+		if (nextBuffer != null) {
+			listener.notifyBuffersAvailable(1);
 		}
+	}
 
-		// 3) Spilling
-		// Make sure that all buffers are written before consuming them. We can't block here,
-		// because this might be called from an network I/O thread.
-		if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
-			return null;
-		}
+	int releaseMemory() throws IOException {
+		synchronized (buffers) {
+			if (spilledView != null || nextBuffer == null) {
+				// Already spilled or nothing in-memory
+				return 0;
+			} else {
+				// We don't touch next buffer, because a notification has
+				// already been sent for it. Only when it is consumed, will
+				// it be recycled.
+
+				// Create the spill writer and write all buffers to disk
+				BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
+
+				int numBuffers = buffers.size();
+				for (int i = 0; i < numBuffers; i++) {
+					Buffer buffer = buffers.remove();
+					try {
+						spillWriter.writeBlock(buffer);
+					} finally {
+						buffer.recycle();
+					}
+				}
 
-		if (ioMode.isSynchronous()) {
-			spilledView = new SpilledSubpartitionViewSyncIO(
-					parent,
-					bufferProvider.getMemorySegmentSize(),
-					parent.spillWriter.getChannelID(),
-					currentBytesRead);
-		}
-		else {
-			spilledView = new SpilledSubpartitionViewAsyncIO(
+				spilledView = new SpilledSubpartitionView(
 					parent,
-					bufferProvider,
-					parent.ioManager,
-					parent.spillWriter.getChannelID(),
-					currentBytesRead);
-		}
+					memorySegmentSize,
+					spillWriter,
+					numBuffers,
+					listener);
 
-		return spilledView.getNextBuffer();
+				return numBuffers;
+			}
+		}
 	}
 
 	@Override
-	public boolean registerListener(NotificationListener listener) throws IOException {
-		if (spilledView == null) {
-			synchronized (parent.buffers) {
-				// Didn't spill yet, buffers should be in-memory
-				if (parent.spillWriter == null) {
-					return false;
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+		synchronized (buffers) {
+			if (isReleased.get()) {
+				return null;
+			} else if (nextBuffer != null) {
+				Buffer current = nextBuffer;
+				nextBuffer = buffers.poll();
+
+				if (nextBuffer != null) {
+					listener.notifyBuffersAvailable(1);
 				}
-			}
 
-			// Spilling
-			if (parent.spillWriter.getNumberOfOutstandingRequests() > 0) {
-				return parent.spillWriter.registerAllRequestsProcessedListener(listener);
+				return current;
 			}
+		} // else: spilled
 
-			return false;
+		SpilledSubpartitionView spilled = spilledView;
+		if (spilled != null) {
+			return spilled.getNextBuffer();
+		} else {
+			throw new IllegalStateException("No in-memory buffers available, but also nothing spilled.");
 		}
-
-		return spilledView.registerListener(listener);
 	}
 
 	@Override
-	public void notifySubpartitionConsumed() throws IOException {
-		parent.onConsumedSubpartition();
+	public void notifyBuffersAvailable(long buffers) throws IOException {
+		// We do the availability listener notification one by one
 	}
 
 	@Override
 	public void releaseAllResources() throws IOException {
 		if (isReleased.compareAndSet(false, true)) {
-			if (spilledView != null) {
-				spilledView.releaseAllResources();
+			SpilledSubpartitionView spilled = spilledView;
+			if (spilled != null) {
+				spilled.releaseAllResources();
 			}
 		}
 	}
 
 	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		SpilledSubpartitionView spilled = spilledView;
+		if (spilled != null) {
+			spilled.notifySubpartitionConsumed();
+		} else {
+			parent.onConsumedSubpartition();
+		}
+	}
+
+	@Override
 	public boolean isReleased() {
-		return parent.isReleased() || isReleased.get();
+		SpilledSubpartitionView spilled = spilledView;
+		if (spilled != null) {
+			return spilled.isReleased();
+		} else {
+			return parent.isReleased() || isReleased.get();
+		}
 	}
 
 	@Override
 	public Throwable getFailureCause() {
-		return parent.getFailureCause();
+		SpilledSubpartitionView spilled = spilledView;
+		if (spilled != null) {
+			return spilled.getFailureCause();
+		} else {
+			return parent.getFailureCause();
+		}
 	}
 
 	@Override
 	public String toString() {
 		return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
-				parent.index,
-				parent.parent.getPartitionId());
+			parent.index,
+			parent.parent.getPartitionId());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
new file mode 100644
index 0000000..b087a4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Reader for a spilled sub partition.
+ *
+ * <p>The partition availability listener is notified about available buffers
+ * only when the spilling is done. Spilling is done async and if it is still
+ * in progress, we wait with the notification until the spilling is done.
+ *
+ * <p>Reads of the spilled file are done in synchronously.
+ */
+class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
+
+	/** The subpartition this view belongs to. */
+	private final ResultSubpartition parent;
+
+	/** Writer for spills. */
+	private final BufferFileWriter spillWriter;
+
+	/** The synchronous file reader to do the actual I/O. */
+	private final BufferFileReader fileReader;
+
+	/** The buffer pool to read data into. */
+	private final SpillReadBufferPool bufferPool;
+
+	/** Buffer availability listener. */
+	private final BufferAvailabilityListener availabilityListener;
+
+	/** The total number of spilled buffers. */
+	private final long numberOfSpilledBuffers;
+
+	/** Flag indicating whether all resources have been released. */
+	private AtomicBoolean isReleased = new AtomicBoolean();
+
+	/** Flag indicating whether a spill is still in progress. */
+	private volatile boolean isSpillInProgress = true;
+
+	SpilledSubpartitionView(
+		ResultSubpartition parent,
+		int memorySegmentSize,
+		BufferFileWriter spillWriter,
+		long numberOfSpilledBuffers,
+		BufferAvailabilityListener availabilityListener) throws IOException {
+
+		this.parent = checkNotNull(parent);
+		this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
+		this.spillWriter = checkNotNull(spillWriter);
+		this.fileReader = new SynchronousBufferFileReader(spillWriter.getChannelID(), false);
+		checkArgument(numberOfSpilledBuffers >= 0);
+		this.numberOfSpilledBuffers = numberOfSpilledBuffers;
+		this.availabilityListener = checkNotNull(availabilityListener);
+
+		// Check whether async spilling is still in progress. If not, this returns
+		// false and we can notify our availability listener about all available buffers.
+		// Otherwise, we notify only when the spill writer callback happens.
+		if (!spillWriter.registerAllRequestsProcessedListener(this)) {
+			isSpillInProgress = false;
+			availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+		}
+	}
+
+	/**
+	 * This is the call back method for the spill writer. If a spill is still
+	 * in progress when this view is created we wait until this method is called
+	 * before we notify the availability listener.
+	 */
+	@Override
+	public void onNotification() {
+		isSpillInProgress = false;
+		availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+		if (fileReader.hasReachedEndOfFile() || isSpillInProgress) {
+			return null;
+		}
+
+		// TODO This is fragile as we implicitly expect that multiple calls to
+		// this method don't happen before recycling buffers returned earlier.
+		Buffer buffer = bufferPool.requestBufferBlocking();
+		fileReader.readInto(buffer);
+
+		return buffer;
+	}
+
+	@Override
+	public void notifyBuffersAvailable(long buffers) throws IOException {
+		// We do the availability listener notification either directly on
+		// construction of this view (when everything has been spilled) or
+		// as soon as spilling is done and we are notified about it in the
+		// #onNotification callback.
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() throws IOException {
+		parent.onConsumedSubpartition();
+	}
+
+	@Override
+	public void releaseAllResources() throws IOException {
+		if (isReleased.compareAndSet(false, true)) {
+			// TODO This can block until all buffers are written out to
+			// disk if a spill is in-progress before deleting the file.
+			// It is possibly called from the Netty event loop threads,
+			// which can bring down the network.
+			spillWriter.closeAndDelete();
+
+			fileReader.close();
+			bufferPool.destroy();
+		}
+	}
+
+	@Override
+	public boolean isReleased() {
+		return parent.isReleased() || isReleased.get();
+	}
+
+	@Override
+	public Throwable getFailureCause() {
+		return parent.getFailureCause();
+	}
+
+	@Override
+	public String toString() {
+		return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+	}
+
+	/**
+	 * A buffer pool to provide buffer to read the file into.
+	 *
+	 * <p>This pool ensures that a consuming input gate makes progress in all cases, even when all
+	 * buffers of the input gate buffer pool have been requested by remote input channels.
+	 */
+	private static class SpillReadBufferPool implements BufferRecycler {
+
+		private final Queue<Buffer> buffers;
+
+		private boolean isDestroyed;
+
+		SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) {
+			this.buffers = new ArrayDeque<>(numberOfBuffers);
+
+			synchronized (buffers) {
+				for (int i = 0; i < numberOfBuffers; i++) {
+					buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
+				}
+			}
+		}
+
+		@Override
+		public void recycle(MemorySegment memorySegment) {
+			synchronized (buffers) {
+				if (isDestroyed) {
+					memorySegment.free();
+				} else {
+					buffers.add(new Buffer(memorySegment, this));
+					buffers.notifyAll();
+				}
+			}
+		}
+
+		private Buffer requestBufferBlocking() throws InterruptedException {
+			synchronized (buffers) {
+				while (true) {
+					if (isDestroyed) {
+						return null;
+					}
+
+					Buffer buffer = buffers.poll();
+
+					if (buffer != null) {
+						return buffer;
+					}
+					// Else: wait for a buffer
+					buffers.wait();
+				}
+			}
+		}
+
+		private void destroy() {
+			synchronized (buffers) {
+				isDestroyed = true;
+				buffers.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f728129b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
deleted file mode 100644
index ca25536..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * View over a spilled subpartition.
- *
- * <p> Reads are triggered asynchronously in batches of configurable size.
- */
-class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
-
-	private final static int DEFAULT_READ_BATCH_SIZE = 2;
-
-	private final Object lock = new Object();
-
-	/** The subpartition this view belongs to. */
-	private final ResultSubpartition parent;
-
-	/** The buffer provider to get the buffer read everything into. */
-	private final BufferProvider bufferProvider;
-
-	/** The buffer availability listener to be notified on available buffers. */
-	private final BufferProviderCallback bufferAvailabilityListener;
-
-	/** The size of read batches. */
-	private final int readBatchSize;
-
-	/**
-	 * The size of the current batch (>= 0 and <= the configured batch size). Reads are only
-	 * triggered when the size of the current batch is 0.
-	 */
-	private final AtomicInteger currentBatchSize = new AtomicInteger();
-
-	/** The asynchronous file reader to do the actual I/O. */
-	private final BufferFileReader asyncFileReader;
-
-	/** The buffers, which have been returned from the file reader. */
-	private final ConcurrentLinkedQueue<Buffer> returnedBuffers = new ConcurrentLinkedQueue<Buffer>();
-
-	/** A data availability listener. */
-	private final AtomicReference<NotificationListener> registeredListener;
-
-	/** Error, which has occurred in the I/O thread. */
-	private volatile IOException errorInIOThread;
-
-	/** Flag indicating whether all resources have been released. */
-	private volatile boolean isReleased;
-
-	/** Flag indicating whether we reached EOF at the file reader. */
-	private volatile boolean hasReachedEndOfFile;
-
-	/** Spilled file size */
-	private final long fileSize;
-
-	SpilledSubpartitionViewAsyncIO(
-			ResultSubpartition parent,
-			BufferProvider bufferProvider,
-			IOManager ioManager,
-			FileIOChannel.ID channelId,
-			long initialSeekPosition) throws IOException {
-
-		this(parent, bufferProvider, ioManager, channelId, initialSeekPosition, DEFAULT_READ_BATCH_SIZE);
-	}
-
-	SpilledSubpartitionViewAsyncIO(
-			ResultSubpartition parent,
-			BufferProvider bufferProvider,
-			IOManager ioManager,
-			FileIOChannel.ID channelId,
-			long initialSeekPosition,
-			int readBatchSize) throws IOException {
-
-		checkArgument(initialSeekPosition >= 0, "Initial seek position is < 0.");
-		checkArgument(readBatchSize >= 1, "Batch read size < 1.");
-
-		this.parent = checkNotNull(parent);
-		this.bufferProvider = checkNotNull(bufferProvider);
-		this.bufferAvailabilityListener = new BufferProviderCallback(this);
-		this.registeredListener = new AtomicReference<>();
-		
-		this.asyncFileReader = ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
-
-		if (initialSeekPosition > 0) {
-			asyncFileReader.seekToPosition(initialSeekPosition);
-		}
-
-		this.readBatchSize = readBatchSize;
-
-		this.fileSize = asyncFileReader.getSize();
-
-		// Trigger the initial read requests
-		readNextBatchAsync();
-	}
-
-	@Override
-	public Buffer getNextBuffer() throws IOException {
-		checkError();
-
-		final Buffer buffer = returnedBuffers.poll();
-
-		// No buffer returned from the I/O thread currently. Either the current batch is in progress
-		// or we trigger the next one.
-		if (buffer == null) {
-			if (currentBatchSize.get() == 0) {
-				readNextBatchAsync();
-			}
-		}
-		else {
-			currentBatchSize.decrementAndGet();
-		}
-
-		return buffer;
-	}
-
-	@Override
-	public boolean registerListener(NotificationListener listener) throws IOException {
-		checkNotNull(listener);
-
-		checkError();
-
-		synchronized (lock) {
-			if (isReleased || !returnedBuffers.isEmpty()) {
-				return false;
-			}
-
-			if (registeredListener.compareAndSet(null, listener)) {
-				return true;
-			} else {
-				throw new IllegalStateException("already registered listener");
-			}
-		}
-	}
-
-	@Override
-	public void notifySubpartitionConsumed() throws IOException {
-		parent.onConsumedSubpartition();
-	}
-
-	@Override
-	public void releaseAllResources() throws IOException {
-		try {
-			synchronized (lock) {
-				if (!isReleased) {
-					// Recycle all buffers. Buffers, which are in flight are recycled as soon as
-					// they return from the I/O thread.
-					Buffer buffer;
-					while ((buffer = returnedBuffers.poll()) != null) {
-						buffer.recycle();
-					}
-
-					isReleased = true;
-				}
-			}
-		}
-		finally {
-			asyncFileReader.close();
-		}
-	}
-
-	@Override
-	public boolean isReleased() {
-		return parent.isReleased() || isReleased;
-	}
-
-	@Override
-	public Throwable getFailureCause() {
-		return parent.getFailureCause();
-	}
-
-	/**
-	 * Requests buffers from the buffer provider and triggers asynchronous read requests to fill
-	 * them.
-	 *
-	 * <p> The number of requested buffers/triggered I/O read requests per call depends on the
-	 * configured size of batch reads.
-	 */
-	private void readNextBatchAsync() throws IOException {
-		// This does not need to be fully synchronized with actually reaching EOF as long as
-		// we eventually notice it. In the worst case, we trigger some discarded reads and
-		// notice it when the buffers are returned.
-		//
-		// We only trigger reads if the current batch size is 0.
-		if (hasReachedEndOfFile || currentBatchSize.get() != 0) {
-			return;
-		}
-
-		// Number of successful buffer requests or callback registrations. The call back will
-		// trigger the read as soon as a buffer becomes available again.
-		int i = 0;
-
-		while (i < readBatchSize) {
-			final Buffer buffer = bufferProvider.requestBuffer();
-
-			if (buffer == null) {
-				// Listen for buffer availability.
-				currentBatchSize.incrementAndGet();
-
-				if (bufferProvider.addListener(bufferAvailabilityListener)) {
-					i++;
-				}
-				else if (bufferProvider.isDestroyed()) {
-					currentBatchSize.decrementAndGet();
-					return;
-				}
-				else {
-					// Buffer available again
-					currentBatchSize.decrementAndGet();
-				}
-			}
-			else {
-				currentBatchSize.incrementAndGet();
-
-				asyncFileReader.readInto(buffer);
-			}
-		}
-	}
-
-	/**
-	 * Returns a buffer from the buffer provider.
-	 *
-	 * <p> Note: This method is called from the thread recycling the available buffer.
-	 */
-	private void onAvailableBuffer(Buffer buffer) {
-		try {
-			asyncFileReader.readInto(buffer);
-		}
-		catch (IOException e) {
-			notifyError(e);
-		}
-	}
-
-	/**
-	 * Returns a successful buffer read request.
-	 *
-	 * <p> Note: This method is always called from the same I/O thread.
-	 */
-	private void returnBufferFromIOThread(Buffer buffer) {
-		final NotificationListener listener;
-
-		synchronized (lock) {
-			if (hasReachedEndOfFile || isReleased) {
-				buffer.recycle();
-
-				return;
-			}
-
-			returnedBuffers.add(buffer);
-
-			// after this, the listener should be null
-			listener = registeredListener.getAndSet(null);
-
-			// If this was the last buffer before we reached EOF, set the corresponding flag to
-			// ensure that further buffers are correctly recycled and eventually no further reads
-			// are triggered.
-			if (asyncFileReader.hasReachedEndOfFile()) {
-				hasReachedEndOfFile = true;
-			}
-		}
-
-		if (listener != null) {
-			listener.onNotification();
-		}
-	}
-
-	/**
-	 * Notifies the view about an error.
-	 */
-	private void notifyError(IOException error) {
-		if (errorInIOThread == null) {
-			errorInIOThread = error;
-		}
-
-		final NotificationListener listener = registeredListener.getAndSet(null);
-		if (listener != null) {
-			listener.onNotification();
-		}
-	}
-
-	/**
-	 * Checks whether an error has been reported and rethrow the respective Exception, if available.
-	 */
-	private void checkError() throws IOException {
-		if (errorInIOThread != null) {
-			throw errorInIOThread;
-		}
-	}
-
-	/**
-	 * Callback from the I/O thread.
-	 *
-	 * <p> Successful buffer read requests add the buffer to the subpartition view, and failed ones
-	 * notify about the error.
-	 */
-	private static class IOThreadCallback implements RequestDoneCallback<Buffer> {
-
-		private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
-		public IOThreadCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
-			this.subpartitionView = subpartitionView;
-		}
-
-		@Override
-		public void requestSuccessful(Buffer buffer) {
-			subpartitionView.returnBufferFromIOThread(buffer);
-		}
-
-		@Override
-		public void requestFailed(Buffer buffer, IOException error) {
-			// Recycle the buffer and forward the error
-			buffer.recycle();
-
-			subpartitionView.notifyError(error);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("SpilledSubpartitionView[async](index: %d, file size: %d bytes) of ResultPartition %s",
-				parent.index,
-				fileSize,
-				parent.parent.getPartitionId());
-	}
-
-	/**
-	 * Callback from the buffer provider.
-	 */
-	private static class BufferProviderCallback implements EventListener<Buffer> {
-
-		private final SpilledSubpartitionViewAsyncIO subpartitionView;
-
-		private BufferProviderCallback(SpilledSubpartitionViewAsyncIO subpartitionView) {
-			this.subpartitionView = subpartitionView;
-		}
-
-		@Override
-		public void onEvent(Buffer buffer) {
-			if (buffer == null) {
-				return;
-			}
-
-			subpartitionView.onAvailableBuffer(buffer);
-		}
-	}
-}