You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/26 09:30:47 UTC

flink git commit: [FLINK-1954] [FLINK-1957] [runtime] Improve error handling of transport failures

Repository: flink
Updated Branches:
  refs/heads/master fdac963d5 -> 2a65b6221


[FLINK-1954] [FLINK-1957] [runtime] Improve error handling of transport failures

Problem: Failures in the network stack were not properly handled and correctly
attributed.

Solution: Failures are always attributeed to the client (consumer). This change
introduces TransportException, which indicates whether the problem ocurred
locally or remotely. This makes it easy to reason about the source of a problem.

This closes #713.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a65b622
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a65b622
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a65b622

Branch: refs/heads/master
Commit: 2a65b62216e8fb73fce65209bf646ca67e5f96b0
Parents: fdac963
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed May 20 15:16:05 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue May 26 09:30:17 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyClient.java   |   6 +-
 .../runtime/io/network/netty/NettyMessage.java  |  54 ++-
 .../runtime/io/network/netty/NettyProtocol.java |   6 +-
 .../runtime/io/network/netty/NettyServer.java   |   8 +-
 .../network/netty/PartitionRequestClient.java   |  24 +-
 .../netty/PartitionRequestClientFactory.java    |  19 +-
 .../netty/PartitionRequestClientHandler.java    | 153 ++++---
 .../network/netty/PartitionRequestProtocol.java |  36 +-
 .../io/network/netty/PartitionRequestQueue.java | 120 ++----
 .../netty/PartitionRequestServerHandler.java    |   6 +
 .../exception/LocalTransportException.java      |  34 ++
 .../exception/RemoteTransportException.java     |  34 ++
 .../netty/exception/TransportException.java     |  43 ++
 .../partition/ResultSubpartitionView.java       |   1 -
 .../netty/CancelPartitionRequestTest.java       | 133 ++++++
 .../netty/ClientTransportErrorHandlingTest.java | 408 +++++++++++++++++++
 .../NettyServerLowAndHighWatermarkTest.java     |  51 +--
 .../runtime/io/network/netty/NettyTestUtil.java | 178 ++++++++
 .../PartitionRequestClientFactoryTest.java      |  10 +-
 .../PartitionRequestClientHandlerTest.java      |   9 +-
 .../netty/ServerTransportErrorHandlingTest.java | 116 ++++++
 21 files changed, 1218 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
index 201ec33..fd6d980 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
@@ -107,7 +107,7 @@ class NettyClient {
 		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
-				protocol.setClientChannelPipeline(channel.pipeline());
+				channel.pipeline().addLast(protocol.getClientChannelHandlers());
 			}
 		});
 
@@ -115,6 +115,10 @@ class NettyClient {
 		LOG.info("Successful initialization (took {} ms).", (end - start));
 	}
 
+	NettyConfig getConfig() {
+		return config;
+	}
+
 	void shutdown() {
 		long start = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 0606f4b..db5c542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
@@ -89,12 +90,20 @@ abstract class NettyMessage {
 		@Override
 		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 			if (msg instanceof NettyMessage) {
+
+				ByteBuf serialized = null;
+
 				try {
-					ctx.write(((NettyMessage) msg).write(ctx.alloc()), promise);
+					serialized = ((NettyMessage) msg).write(ctx.alloc());
 				}
 				catch (Throwable t) {
 					throw new IOException("Error while serializing message: " + msg, t);
 				}
+				finally {
+					if (serialized != null) {
+						ctx.write(serialized, promise);
+					}
+				}
 			}
 			else {
 				ctx.write(msg, promise);
@@ -311,6 +320,7 @@ abstract class NettyMessage {
 
 			return result;
 		}
+
 		@Override
 		void readFrom(ByteBuf buffer) throws Exception {
 			DataInputView inputView = new ByteBufDataInputView(buffer);
@@ -463,6 +473,48 @@ abstract class NettyMessage {
 		}
 	}
 
+	/**
+	 * Cancels the partition request of the {@link InputChannel} identified by
+	 * {@link InputChannelID}.
+	 *
+	 * <p> There is a 1:1 mapping between the input channel and partition per physical channel.
+	 * Therefore, the {@link InputChannelID} instance is enough to identify which request to cancel.
+	 */
+	static class CancelPartitionRequest extends NettyMessage {
+
+		final static byte ID = 4;
+
+		InputChannelID receiverId;
+
+		public CancelPartitionRequest(InputChannelID receiverId) {
+			this.receiverId = receiverId;
+		}
+
+		@Override
+		ByteBuf write(ByteBufAllocator allocator) throws Exception {
+			ByteBuf result = null;
+
+			try {
+				result = allocateBuffer(allocator, ID);
+				receiverId.writeTo(result);
+			}
+			catch (Throwable t) {
+				if (result != null) {
+					result.release();
+				}
+
+				throw new IOException(t);
+			}
+
+			return result;
+		}
+
+		@Override
+		void readFrom(ByteBuf buffer) throws Exception {
+			receiverId = InputChannelID.fromByteBuf(buffer);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class ByteBufDataInputView implements DataInputView {

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index 7cc3ec6..6b33cc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelHandler;
 
 public interface NettyProtocol {
 
-	void setServerChannelPipeline(ChannelPipeline channelPipeline);
+	ChannelHandler[] getServerChannelHandlers();
 
-	void setClientChannelPipeline(ChannelPipeline channelPipeline);
+	ChannelHandler[] getClientChannelHandlers();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index fa38734..00fec87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -119,7 +119,7 @@ class NettyServer {
 		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 			@Override
 			public void initChannel(SocketChannel channel) throws Exception {
-				protocol.setServerChannelPipeline(channel.pipeline());
+				channel.pipeline().addLast(protocol.getServerChannelHandlers());
 			}
 		});
 
@@ -130,7 +130,11 @@ class NettyServer {
 		bindFuture = bootstrap.bind().syncUninterruptibly();
 
 		long end = System.currentTimeMillis();
-		LOG.info("Successful initialization  (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString());
+		LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString());
+	}
+
+	NettyConfig getConfig() {
+		return config;
 	}
 
 	void shutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 6d7725f..cb4ecb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
@@ -84,7 +85,7 @@ public class PartitionRequestClient {
 	 * The request goes to the remote producer, for which this partition
 	 * request client instance has been created.
 	 */
-	public void requestSubpartition(
+	public ChannelFuture requestSubpartition(
 			final ResultPartitionID partitionId,
 			final int subpartitionIndex,
 			final RemoteInputChannel inputChannel,
@@ -103,21 +104,31 @@ public class PartitionRequestClient {
 			public void operationComplete(ChannelFuture future) throws Exception {
 				if (!future.isSuccess()) {
 					partitionRequestHandler.removeInputChannel(inputChannel);
-					inputChannel.onError(future.cause());
+					inputChannel.onError(
+							new LocalTransportException(
+									"Sending the partition request failed.",
+									future.channel().localAddress(), future.cause()
+							));
 				}
 			}
 		};
 
 		if (delayMs == 0) {
-			tcpChannel.writeAndFlush(request).addListener(listener);
+			ChannelFuture f = tcpChannel.writeAndFlush(request);
+			f.addListener(listener);
+			return f;
 		}
 		else {
+			final ChannelFuture[] f = new ChannelFuture[1];
 			tcpChannel.eventLoop().schedule(new Runnable() {
 				@Override
 				public void run() {
-					tcpChannel.writeAndFlush(request).addListener(listener);
+					f[0] = tcpChannel.writeAndFlush(request);
+					f[0].addListener(listener);
 				}
 			}, delayMs, TimeUnit.MILLISECONDS);
+
+			return f[0];
 		}
 	}
 
@@ -137,7 +148,10 @@ public class PartitionRequestClient {
 							@Override
 							public void operationComplete(ChannelFuture future) throws Exception {
 								if (!future.isSuccess()) {
-									inputChannel.onError(future.cause());
+									inputChannel.onError(new LocalTransportException(
+											"Sending the task event failed.",
+											future.channel().localAddress(), future.cause()
+									));
 								}
 							}
 						});

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index f274b67..040a8ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -22,6 +22,8 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 
 import java.io.IOException;
@@ -161,10 +163,11 @@ class PartitionRequestClientFactory {
 		private void handInChannel(Channel channel) {
 			synchronized (connectLock) {
 				try {
-					PartitionRequestClientHandler requestHandler =
-							(PartitionRequestClientHandler) channel.pipeline().get(PartitionRequestProtocol.CLIENT_REQUEST_HANDLER_NAME);
+					PartitionRequestClientHandler requestHandler = channel.pipeline()
+							.get(PartitionRequestClientHandler.class);
 
-				partitionRequestClient = new PartitionRequestClient(channel, requestHandler, connectionId, clientFactory);
+					partitionRequestClient = new PartitionRequestClient(
+							channel, requestHandler, connectionId, clientFactory);
 
 					if (disposeRequestClient) {
 						partitionRequestClient.disposeIfNotUsed();
@@ -209,10 +212,16 @@ class PartitionRequestClientFactory {
 				handInChannel(future.channel());
 			}
 			else if (future.cause() != null) {
-				notifyOfError(future.cause());
+				notifyOfError(new RemoteTransportException(
+						"Connecting to remote task manager + '" + connectionId.getAddress() +
+								"' has failed. This might indicate that the remote task " +
+								"manager has been lost.",
+						connectionId.getAddress(), future.cause()));
 			}
 			else {
-				notifyOfError(new IllegalStateException("Connecting the channel has been cancelled."));
+				notifyOfError(new LocalTransportException(
+						"Connecting to remote task manager + '" + connectionId.getAddress() +
+								"' has been cancelled.", null));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index c3e65b1..f0c08c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import com.google.common.collect.Sets;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -31,12 +35,13 @@ import org.apache.flink.runtime.util.event.EventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -57,9 +62,13 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask();
 
-	private ChannelHandlerContext ctx;
+	/**
+	 * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared
+	 * while data is still coming in for this channel.
+	 */
+	private volatile Set<InputChannelID> cancelled;
 
-	private ScheduledFuture<?> logOutputTask;
+	private ChannelHandlerContext ctx;
 
 	// ------------------------------------------------------------------------
 	// Input channel/receiver registration
@@ -87,31 +96,55 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			this.ctx = ctx;
 		}
 
-		if (LOG.isDebugEnabled()) {
-			logOutputTask = ctx.channel().eventLoop().scheduleWithFixedDelay(new DebugOutputTask(), 30, 30, TimeUnit.SECONDS);
-		}
-
 		super.channelActive(ctx);
 	}
 
 	@Override
 	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		if (logOutputTask != null) {
-			logOutputTask.cancel(true);
+		// Unexpected close. In normal operation, the client closes the connection after all input
+		// channels have been removed. This indicates a problem with the remote task manager.
+		if (!inputChannels.isEmpty()) {
+			final SocketAddress remoteAddr = ctx.channel().remoteAddress();
+
+			notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
+					"Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. "
+							+ "This might indicate that the remote task manager was lost.",
+					remoteAddr));
 		}
 
-		super.channelActive(ctx);
+		super.channelInactive(ctx);
 	}
 
+	/**
+	 * Called on exceptions in the client handler pipeline.
+	 *
+	 * <p> Remote exceptions are received as regular payload.
+	 */
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-		if (logOutputTask != null) {
-			logOutputTask.cancel(true);
+
+		if (cause instanceof TransportException) {
+			notifyAllChannelsOfErrorAndClose(cause);
 		}
+		else {
+			final SocketAddress remoteAddr = ctx.channel().remoteAddress();
 
-		notifyAllChannelsOfErrorAndClose(cause);
+			final TransportException tex;
 
-		super.exceptionCaught(ctx, cause);
+			// Improve on the connection reset by peer error message
+			if (cause instanceof IOException
+					&& cause.getMessage().equals("Connection reset by peer")) {
+
+				tex = new RemoteTransportException(
+						"Lost connection to task manager '" + remoteAddr + "'. This indicates "
+								+ "that the remote task manager was lost.", remoteAddr, cause);
+			}
+			else {
+				tex = new LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), cause);
+			}
+
+			notifyAllChannelsOfErrorAndClose(tex);
+		}
 	}
 
 	@Override
@@ -131,18 +164,38 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 	private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
 		if (channelError.compareAndSet(false, true)) {
-			for (RemoteInputChannel inputChannel : inputChannels.values()) {
-				inputChannel.onError(cause);
+			try {
+				for (RemoteInputChannel inputChannel : inputChannels.values()) {
+					inputChannel.onError(cause);
+				}
 			}
+			catch (Throwable t) {
+				// We can only swallow the Exception at this point. :(
+				LOG.warn("An Exception was thrown during error notification of a "
+						+ "remote input channel.", t);
+			}
+			finally {
+				inputChannels.clear();
 
-			inputChannels.clear();
-
-			if (ctx != null) {
-				ctx.close();
+				if (ctx != null) {
+					ctx.close();
+				}
 			}
 		}
 	}
 
+	private void cancelRequestFor(InputChannelID inputChannelId) {
+		if (cancelled == null) {
+			cancelled = Sets.newConcurrentHashSet();
+		}
+
+		if (!cancelled.contains(inputChannelId)) {
+			ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId));
+
+			cancelled.add(inputChannelId);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -160,6 +213,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
 			if (inputChannel == null) {
 				bufferOrEvent.releaseBuffer();
+
+				cancelRequestFor(bufferOrEvent.receiverId);
+
 				return true;
 			}
 
@@ -169,9 +225,12 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 		else if (msgClazz == NettyMessage.ErrorResponse.class) {
 			NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;
 
+			SocketAddress remoteAddr = ctx.channel().remoteAddress();
 
 			if (error.isFatalError()) {
-				notifyAllChannelsOfErrorAndClose(error.error);
+				notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
+						"Fatal error at remote task manager '" + remoteAddr + "'.",
+						remoteAddr, error.error));
 			}
 			else {
 				RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);
@@ -181,7 +240,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 						inputChannel.onFailedPartitionRequest();
 					}
 					else {
-						inputChannel.onError(error.error);
+						inputChannel.onError(new RemoteTransportException(
+								"Error at remote task manager '" + remoteAddr + "'.",
+										remoteAddr, error.error));
 					}
 				}
 			}
@@ -210,6 +271,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				BufferProvider bufferProvider = inputChannel.getBufferProvider();
 
 				if (bufferProvider == null) {
+
+					cancelRequestFor(bufferOrEvent.receiverId);
+
 					return false; // receiver has been cancelled/failed
 				}
 
@@ -364,6 +428,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 
 					success = true;
 				}
+				else {
+					cancelRequestFor(inputChannel.getInputChannelId());
+				}
 
 				stagedBufferResponse = null;
 
@@ -379,7 +446,6 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				notifyAllChannelsOfErrorAndClose(t);
 			}
 			finally {
-
 				if (!success) {
 					if (buffer != null) {
 						buffer.recycle();
@@ -409,43 +475,4 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			}
 		}
 	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Debug output task executed periodically by the network I/O thread.
-	 */
-	private class DebugOutputTask implements Runnable {
-
-		@Override
-		public void run() {
-			StringBuilder str = new StringBuilder();
-
-			str.append("Channel remote address: ");
-			str.append(ctx.channel().remoteAddress());
-			str.append(". ");
-
-			str.append("Channel active: ");
-			str.append(ctx.channel().isActive());
-			str.append(". ");
-
-			str.append("Number of registered input channels: ");
-			str.append(inputChannels.size());
-			str.append(". ");
-
-			str.append("Has staged buffer or event: ");
-			str.append(bufferListener.hasStagedBufferOrEvent());
-			str.append(". ");
-
-			str.append("Total number of staged messages: ");
-			str.append(stagedMessages.size());
-			str.append(". ");
-
-			str.append("Channel auto read? ");
-			str.append(ctx.channel().config().isAutoRead());
-			str.append(". ");
-
-			LOG.debug(str.toString());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
index 61a0970..a39f085 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
@@ -28,8 +28,6 @@ import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessag
 
 class PartitionRequestProtocol implements NettyProtocol {
 
-	static final String CLIENT_REQUEST_HANDLER_NAME = "Client request handler";
-
 	private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder();
 
 	private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder();
@@ -64,7 +62,7 @@ class PartitionRequestProtocol implements NettyProtocol {
 	// |               |                                   |               |
 	// |    +----------+----------+                        |               |
 	// |    | Frame decoder       |                        |               |
-	// |    +----------+----------+                        |
+	// |    +----------+----------+                        |               |
 	// |              /|\                                  |               |
 	// +---------------+-----------------------------------+---------------+
 	// |               | (1) client request               \|/
@@ -76,18 +74,20 @@ class PartitionRequestProtocol implements NettyProtocol {
 	// +-------------------------------------------------------------------+
 
 	@Override
-	public void setServerChannelPipeline(ChannelPipeline channelPipeline) {
+	public ChannelHandler[] getServerChannelHandlers() {
 		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
+		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
+				partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);
 
-		channelPipeline
-				.addLast("Message encoder", messageEncoder)
-				.addLast("Frame decoder", createFrameLengthDecoder())
-				.addLast("Client request decoder", messageDecoder)
-				.addLast("Server request handler", new PartitionRequestServerHandler(partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool))
-				.addLast("Queue of queues", queueOfPartitionQueues);
+		return new ChannelHandler[] {
+				messageEncoder,
+				createFrameLengthDecoder(),
+				messageDecoder,
+				serverHandler,
+				queueOfPartitionQueues
+		};
 	}
 
-
 	//     +-----------+----------+            +----------------------+
 	//     | Remote input channel |            | request client       |
 	//     +-----------+----------+            +-----------+----------+
@@ -119,11 +119,11 @@ class PartitionRequestProtocol implements NettyProtocol {
 	// +-------------------------------------------------------------------+
 
 	@Override
-	public void setClientChannelPipeline(ChannelPipeline channelPipeline) {
-		channelPipeline
-				.addLast("Message encoder", messageEncoder)
-				.addLast("Frame decoder", createFrameLengthDecoder())
-				.addLast("Server response decoder", messageDecoder)
-				.addLast(CLIENT_REQUEST_HANDLER_NAME, new PartitionRequestClientHandler());
+	public ChannelHandler[] getClientChannelHandlers() {
+		return new ChannelHandler[] {
+				messageEncoder,
+				createFrameLengthDecoder(),
+				messageDecoder,
+				new PartitionRequestClientHandler()};
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 420a276..279ddbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -35,9 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
@@ -60,36 +57,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 	private ChannelHandlerContext ctx;
 
-	private int numTotalSubscribeCalls;
-
-	private AtomicInteger numOutstandingSubscribeCalls = new AtomicInteger();
-
-	private int numConsumedPartitions;
-
-	private AtomicInteger numEnqueueCalls = new AtomicInteger();
-
-	private int numTotalEnqueueOperations;
-
-	private ScheduledFuture<?> logOutputTask;
-
-	@Override
-	public void channelActive(ChannelHandlerContext ctx) throws Exception {
-		if (LOG.isDebugEnabled()) {
-			logOutputTask = ctx.channel().eventLoop().scheduleWithFixedDelay(new DebugOutputTask(), 30, 30, TimeUnit.SECONDS);
-		}
-
-		super.channelActive(ctx);
-	}
-
-	@Override
-	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-		if (logOutputTask != null) {
-			logOutputTask.cancel(true);
-		}
-
-		super.channelUnregistered(ctx);
-	}
-
 	@Override
 	public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
 		if (this.ctx == null) {
@@ -100,22 +67,47 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 	}
 
 	public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
-		numEnqueueCalls.incrementAndGet();
 		ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
 	}
 
+	public void cancel(InputChannelID receiverId) {
+		ctx.pipeline().fireUserEventTriggered(receiverId);
+	}
+
 	@Override
 	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
 		if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
 			boolean triggerWrite = queue.isEmpty();
 
-			numTotalEnqueueOperations++;
 			queue.add((SequenceNumberingSubpartitionView) msg);
 
 			if (triggerWrite) {
 				writeAndFlushNextMessageIfPossible(ctx.channel());
 			}
 		}
+		else if (msg.getClass() == InputChannelID.class) {
+			InputChannelID toCancel = (InputChannelID) msg;
+
+			// Cancel the request for the input channel
+			if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) {
+				currentPartitionQueue.releaseAllResources();
+				currentPartitionQueue = null;
+			}
+			else {
+				int size = queue.size();
+
+				for (int i = 0; i < size; i++) {
+					SequenceNumberingSubpartitionView curr = queue.poll();
+
+					if (curr.getReceiverId().equals(toCancel)) {
+						curr.releaseAllResources();
+					}
+					else {
+						queue.add(curr);
+					}
+				}
+			}
+		}
 		else {
 			ctx.fireUserEventTriggered(msg);
 		}
@@ -144,9 +136,6 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
 					if (buffer == null) {
 						if (currentPartitionQueue.registerListener(null)) {
-							numTotalSubscribeCalls++;
-							numOutstandingSubscribeCalls.incrementAndGet();
-
 							currentPartitionQueue = null;
 						}
 						else if (currentPartitionQueue.isReleased()) {
@@ -293,63 +282,8 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		 */
 		@Override
 		public void onNotification() {
-			numOutstandingSubscribeCalls.decrementAndGet();
 			ctx.pipeline().fireUserEventTriggered(this);
 		}
 	}
 
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Debug output task executed periodically by the network I/O thread.
-	 */
-	private class DebugOutputTask implements Runnable {
-
-		@Override
-		public void run() {
-			StringBuilder str = new StringBuilder();
-
-			str.append("Channel remote address: ");
-			str.append(ctx.channel().remoteAddress());
-			str.append(". ");
-
-			str.append("Channel active: ");
-			str.append(ctx.channel().isActive());
-			str.append(". ");
-
-			str.append("Total number of queue operations: ");
-			str.append(numTotalEnqueueOperations);
-			str.append(". ");
-
-			str.append("Number of enqueue calls: ");
-			str.append(numEnqueueCalls.get());
-			str.append(". ");
-
-			str.append("Number of consumed partitions: ");
-			str.append(numConsumedPartitions);
-			str.append(". ");
-
-			str.append("Number of currently queued partitions: ");
-			str.append(queue.size());
-			str.append(". ");
-
-			str.append("Current partition queue: ");
-			str.append(currentPartitionQueue);
-			str.append(". ");
-
-			str.append("Total number of subscribe calls: ");
-			str.append(numTotalSubscribeCalls);
-			str.append(". ");
-
-			str.append("Number of outstanding subscribe calls: ");
-			str.append(numOutstandingSubscribeCalls.get());
-			str.append(". ");
-
-			str.append("Channel writeable? ");
-			str.append(ctx.channel().isWritable());
-			str.append(". ");
-
-			LOG.debug(str.toString());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 7fa37e8..15a9951 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.CancelPartitionRequest;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -114,6 +115,11 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 					respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
 				}
 			}
+			else if (msgClazz == CancelPartitionRequest.class) {
+				CancelPartitionRequest request = (CancelPartitionRequest) msg;
+
+				outboundQueue.cancel(request.receiverId);
+			}
 			else {
 				LOG.warn("Received unexpected client request: {}", msg);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
new file mode 100644
index 0000000..37f6e53
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty.exception;
+
+import java.net.SocketAddress;
+
+public class LocalTransportException extends TransportException {
+
+	private static final long serialVersionUID = 2366708881288640674L;
+
+	public LocalTransportException(String message, SocketAddress address) {
+		super(message, address);
+	}
+
+	public LocalTransportException(String message, SocketAddress address, Throwable cause) {
+		super(message, address, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
new file mode 100644
index 0000000..5f81883
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty.exception;
+
+import java.net.SocketAddress;
+
+public class RemoteTransportException extends TransportException {
+
+	private static final long serialVersionUID = 4373615529545893089L;
+
+	public RemoteTransportException(String message, SocketAddress address) {
+		super(message, address);
+	}
+
+	public RemoteTransportException(String message, SocketAddress address, Throwable cause) {
+		super(message, address, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
new file mode 100644
index 0000000..0438688
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty.exception;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+public abstract class TransportException extends IOException {
+
+	private static final long serialVersionUID = 3637820720589866570L;
+
+	private final SocketAddress address;
+
+	public TransportException(String message, SocketAddress address) {
+		this(message, address, null);
+	}
+
+	public TransportException(String message, SocketAddress address, Throwable cause) {
+		super(message, cause);
+
+		this.address = address;
+	}
+
+	public SocketAddress getAddress() {
+		return address;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 82cee6c..dc445ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -49,7 +49,6 @@ public interface ResultSubpartitionView {
 	 */
 	boolean registerListener(NotificationListener listener) throws IOException;
 
-
 	void releaseAllResources() throws IOException;
 
 	void notifySubpartitionConsumed() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
new file mode 100644
index 0000000..75f4284
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.channel.Channel;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.event.NotificationListener;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.*;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CancelPartitionRequestTest {
+
+	/**
+	 * Verifies that requests for non-existing (failed/cancelled) input channels are properly
+	 * cancelled.
+	 */
+	@Test
+	public void testCancelPartitionRequest() throws Exception {
+
+		NettyServerAndClient serverAndClient = null;
+
+		try {
+			TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
+
+			ResultPartitionManager partitions = mock(ResultPartitionManager.class);
+
+			ResultPartitionID pid = new ResultPartitionID();
+
+			CountDownLatch sync = new CountDownLatch(1);
+
+			// Return infinite subpartition
+			when(partitions.createSubpartitionView(eq(pid), eq(0), any(BufferProvider.class)))
+					.thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync));
+
+			PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+					partitions, mock(TaskEventDispatcher.class), mock(NetworkBufferPool.class));
+
+			serverAndClient = initServerAndClient(protocol);
+
+			Channel ch = connect(serverAndClient);
+
+			// Request for non-existing input channel => results in cancel request
+			ch.writeAndFlush(new PartitionRequest(pid, 0, new InputChannelID())).await();
+
+			// Wait for the notification
+			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
+				fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
+						" ms to be notified about cancelled partition.");
+			}
+		}
+		finally {
+			shutdown(serverAndClient);
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	static class InfiniteSubpartitionView implements ResultSubpartitionView {
+
+		private final BufferProvider bufferProvider;
+
+		private final CountDownLatch sync;
+
+		public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) {
+			this.bufferProvider = checkNotNull(bufferProvider);
+			this.sync = checkNotNull(sync);
+		}
+
+		@Override
+		public Buffer getNextBuffer() throws IOException, InterruptedException {
+			return bufferProvider.requestBufferBlocking();
+		}
+
+		@Override
+		public boolean registerListener(final NotificationListener listener) throws IOException {
+			return false;
+		}
+
+		@Override
+		public void releaseAllResources() throws IOException {
+			sync.countDown();
+		}
+
+		@Override
+		public void notifySubpartitionConsumed() throws IOException {
+		}
+
+		@Override
+		public boolean isReleased() {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
new file mode 100644
index 0000000..ab96d4a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ClientTransportErrorHandlingTest {
+
+	/**
+	 * Verifies that failed client requests via {@link PartitionRequestClient} are correctly
+	 * attributed to the respective {@link RemoteInputChannel}.
+	 */
+	@Test
+	public void testExceptionOnWrite() throws Exception {
+
+		NettyProtocol protocol = new NettyProtocol() {
+			@Override
+			public ChannelHandler[] getServerChannelHandlers() {
+				return new ChannelHandler[0];
+			}
+
+			@Override
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new PartitionRequestProtocol(
+						mock(ResultPartitionProvider.class),
+						mock(TaskEventDispatcher.class),
+						mock(NetworkBufferPool.class)).getClientChannelHandlers();
+			}
+		};
+
+		// We need a real server and client in this test, because Netty's EmbeddedChannel is
+		// not failing the ChannelPromise of failed writes.
+		NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
+
+		Channel ch = connect(serverAndClient);
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		// Last outbound handler throws Exception after 1st write
+		ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+			int writeNum = 0;
+
+			@Override
+			public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+					throws Exception {
+
+				if (writeNum >= 1) {
+					throw new RuntimeException("Expected test exception.");
+				}
+
+				writeNum++;
+				ctx.write(msg, promise);
+			}
+		});
+
+		PartitionRequestClient requestClient = new PartitionRequestClient(
+				ch, handler, mock(ConnectionID.class), mock(PartitionRequestClientFactory.class));
+
+		// Create input channels
+		RemoteInputChannel[] rich = new RemoteInputChannel[] {
+				createRemoteInputChannel(), createRemoteInputChannel()};
+
+		final CountDownLatch sync = new CountDownLatch(1);
+
+		// Do this with explicit synchronization. Otherwise this is not robust against slow timings
+		// of the callback (e.g. we cannot just verify that it was called once, because there is
+		// a chance that we do this too early).
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				sync.countDown();
+				return null;
+			}
+		}).when(rich[1]).onError(isA(LocalTransportException.class));
+
+		// First request is successful
+		ChannelFuture f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[0], 0);
+		assertTrue(f.await().isSuccess());
+
+		// Second request is *not* successful
+		f = requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
+		assertFalse(f.await().isSuccess());
+
+		// Only the second channel should be notified about the error
+		verify(rich[0], times(0)).onError(any(LocalTransportException.class));
+
+		// Wait for the notification
+		if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
+			fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
+					" ms to be notified about the channel error.");
+		}
+
+		shutdown(serverAndClient);
+	}
+
+	/**
+	 * Verifies that {@link NettyMessage.ErrorResponse} messages are correctly wrapped in
+	 * {@link RemoteTransportException} instances.
+	 */
+	@Test
+	public void testWrappingOfRemoteErrorMessage() throws Exception {
+		EmbeddedChannel ch = createEmbeddedChannel();
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		// Create input channels
+		RemoteInputChannel[] rich = new RemoteInputChannel[] {
+				createRemoteInputChannel(), createRemoteInputChannel()};
+
+		for (RemoteInputChannel r : rich) {
+			when(r.getInputChannelId()).thenReturn(new InputChannelID());
+			handler.addInputChannel(r);
+		}
+
+		// Error msg for channel[0]
+		ch.pipeline().fireChannelRead(new NettyMessage.ErrorResponse(
+				new RuntimeException("Expected test exception"),
+				rich[0].getInputChannelId()));
+
+		try {
+			// Exception should not reach end of pipeline...
+			ch.checkException();
+		}
+		catch (Exception e) {
+			fail("The exception reached the end of the pipeline and "
+					+ "was not handled correctly by the last handler.");
+		}
+
+		verify(rich[0], times(1)).onError(isA(RemoteTransportException.class));
+		verify(rich[1], never()).onError(any(Throwable.class));
+
+		// Fatal error for all channels
+		ch.pipeline().fireChannelRead(new NettyMessage.ErrorResponse(
+				new RuntimeException("Expected test exception")));
+
+		try {
+			// Exception should not reach end of pipeline...
+			ch.checkException();
+		}
+		catch (Exception e) {
+			fail("The exception reached the end of the pipeline and "
+					+ "was not handled correctly by the last handler.");
+		}
+
+		verify(rich[0], times(2)).onError(isA(RemoteTransportException.class));
+		verify(rich[1], times(1)).onError(isA(RemoteTransportException.class));
+	}
+
+	/**
+	 * Verifies that unexpected remote closes are reported as an instance of
+	 * {@link RemoteTransportException}.
+	 */
+	@Test
+	public void testExceptionOnRemoteClose() throws Exception {
+
+		NettyProtocol protocol = new NettyProtocol() {
+			@Override
+			public ChannelHandler[] getServerChannelHandlers() {
+				return new ChannelHandler[] {
+						// Close on read
+						new ChannelInboundHandlerAdapter() {
+							@Override
+							public void channelRead(ChannelHandlerContext ctx, Object msg)
+									throws Exception {
+
+								ctx.channel().close();
+							}
+						}
+				};
+			}
+
+			@Override
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new PartitionRequestProtocol(
+						mock(ResultPartitionProvider.class),
+						mock(TaskEventDispatcher.class),
+						mock(NetworkBufferPool.class)).getClientChannelHandlers();
+			}
+		};
+
+		NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
+
+		Channel ch = connect(serverAndClient);
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		// Create input channels
+		RemoteInputChannel[] rich = new RemoteInputChannel[] {
+				createRemoteInputChannel(), createRemoteInputChannel()};
+
+		final CountDownLatch sync = new CountDownLatch(rich.length);
+
+		Answer<Void> countDownLatch = new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				sync.countDown();
+				return null;
+			}
+		};
+
+		for (RemoteInputChannel r : rich) {
+			doAnswer(countDownLatch).when(r).onError(any(Throwable.class));
+			handler.addInputChannel(r);
+		}
+
+		// Write something to trigger close by server
+		ch.writeAndFlush(Unpooled.buffer().writerIndex(16));
+
+		// Wait for the notification
+		if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
+			fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
+					" ms to be notified about remote connection close.");
+		}
+
+		// All the registered channels should be notified.
+		for (RemoteInputChannel r : rich) {
+			verify(r).onError(isA(RemoteTransportException.class));
+		}
+
+		shutdown(serverAndClient);
+	}
+
+	/**
+	 * Verifies that fired Exceptions are handled correctly by the pipeline.
+	 */
+	@Test
+	public void testExceptionCaught() throws Exception {
+		EmbeddedChannel ch = createEmbeddedChannel();
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		// Create input channels
+		RemoteInputChannel[] rich = new RemoteInputChannel[] {
+				createRemoteInputChannel(), createRemoteInputChannel()};
+
+		for (RemoteInputChannel r : rich) {
+			when(r.getInputChannelId()).thenReturn(new InputChannelID());
+			handler.addInputChannel(r);
+		}
+
+		ch.pipeline().fireExceptionCaught(new Exception());
+
+		try {
+			// Exception should not reach end of pipeline...
+			ch.checkException();
+		}
+		catch (Exception e) {
+			fail("The exception reached the end of the pipeline and "
+					+ "was not handled correctly by the last handler.");
+		}
+
+		// ...but all the registered channels should be notified.
+		for (RemoteInputChannel r : rich) {
+			verify(r).onError(isA(LocalTransportException.class));
+		}
+	}
+
+	/**
+	 * Verifies that "Connection reset by peer" Exceptions are special-cased and are reported as
+	 * an instance of {@link RemoteTransportException}.
+	 */
+	@Test
+	public void testConnectionResetByPeer() throws Throwable {
+		EmbeddedChannel ch = createEmbeddedChannel();
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		RemoteInputChannel rich = addInputChannel(handler);
+
+		final Throwable[] error = new Throwable[1];
+
+		// Verify the Exception
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				Throwable cause = (Throwable) invocation.getArguments()[0];
+
+				try {
+					assertEquals(RemoteTransportException.class, cause.getClass());
+					assertNotEquals("Connection reset by peer", cause.getMessage());
+
+					assertEquals(IOException.class, cause.getCause().getClass());
+					assertEquals("Connection reset by peer", cause.getCause().getMessage());
+				}
+				catch (Throwable t) {
+					error[0] = t;
+				}
+
+				return null;
+			}
+		}).when(rich).onError(any(Throwable.class));
+
+		ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
+
+		assertNull(error[0]);
+	}
+
+	/**
+	 * Verifies that the channel is closed if there is an error *during* error notification.
+	 */
+	@Test
+	public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
+		EmbeddedChannel ch = createEmbeddedChannel();
+
+		PartitionRequestClientHandler handler = getClientHandler(ch);
+
+		RemoteInputChannel rich = addInputChannel(handler);
+
+		doThrow(new RuntimeException("Expected test exception"))
+				.when(rich).onError(any(Throwable.class));
+
+		ch.pipeline().fireExceptionCaught(new Exception());
+
+		assertFalse(ch.isActive());
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Helpers
+	// ---------------------------------------------------------------------------------------------
+
+	private EmbeddedChannel createEmbeddedChannel() {
+		PartitionRequestProtocol protocol = new PartitionRequestProtocol(
+				mock(ResultPartitionProvider.class),
+				mock(TaskEventDispatcher.class),
+				mock(NetworkBufferPool.class));
+
+		return new EmbeddedChannel(protocol.getClientChannelHandlers());
+	}
+
+	private RemoteInputChannel addInputChannel(PartitionRequestClientHandler clientHandler) {
+		RemoteInputChannel rich = createRemoteInputChannel();
+		clientHandler.addInputChannel(rich);
+
+		return rich;
+	}
+
+	private PartitionRequestClientHandler getClientHandler(Channel ch) {
+		return ch.pipeline().get(PartitionRequestClientHandler.class);
+	}
+
+	private RemoteInputChannel createRemoteInputChannel() {
+		return when(mock(RemoteInputChannel.class)
+				.getInputChannelId())
+				.thenReturn(new InputChannelID()).getMock();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index 43ccbcd..0038640 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -21,18 +21,19 @@ package org.apache.flink.runtime.io.network.netty;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.NetUtils;
 import org.junit.Test;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.awaitClose;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -53,43 +54,31 @@ public class NettyServerLowAndHighWatermarkTest {
 	 */
 	@Test
 	public void testLowAndHighWatermarks() throws Throwable {
-		final NettyConfig conf = new NettyConfig(
-				InetAddress.getLocalHost(),
-				NetUtils.getAvailablePort(),
-				PageSize,
-				new Configuration());
-
 		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 		final NettyProtocol protocol = new NettyProtocol() {
 			@Override
-			public void setServerChannelPipeline(ChannelPipeline channelPipeline) {
+			public ChannelHandler[] getServerChannelHandlers() {
 				// The channel handler implements the test
-				channelPipeline.addLast(new TestLowAndHighWatermarkHandler(error));
+				return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(error)};
 			}
 
 			@Override
-			public void setClientChannelPipeline(ChannelPipeline channelPipeline) {
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[0];
 			}
 		};
 
-		final NettyServer server = new NettyServer(conf);
-		final NettyClient client = new NettyClient(conf);
+		final NettyConfig conf = createConfig(PageSize);
 
-		try {
-			server.init(protocol);
-			client.init(protocol);
+		final NettyServerAndClient serverAndClient = initServerAndClient(protocol, conf);
 
+		try {
 			// We can't just check the config of this channel as it is the client's channel. We need
 			// to check the server channel, because it is doing the data transfers.
-			final Channel ch = client
-					.connect(new InetSocketAddress(conf.getServerAddress(), conf.getServerPort()))
-					.sync()
-					.channel();
+			final Channel ch = connect(serverAndClient);
 
 			// Wait for the channel to be closed
-			while (ch.isActive()) {
-				ch.closeFuture().await(1, TimeUnit.SECONDS);
-			}
+			awaitClose(ch);
 
 			final Throwable t = error.get();
 			if (t != null) {
@@ -97,13 +86,7 @@ public class NettyServerLowAndHighWatermarkTest {
 			}
 		}
 		finally {
-			if (server != null) {
-				server.shutdown();
-			}
-
-			if (client != null) {
-				client.shutdown();
-			}
+			shutdown(serverAndClient);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
new file mode 100644
index 0000000..538901f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.channel.Channel;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.net.NetUtils;
+import scala.Tuple2;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Test utility for Netty server and client setup.
+ */
+public class NettyTestUtil {
+
+	static int DEFAULT_SEGMENT_SIZE = 1024;
+
+	// ---------------------------------------------------------------------------------------------
+	// NettyServer and NettyClient
+	// ---------------------------------------------------------------------------------------------
+
+	static NettyServer initServer(NettyConfig config, NettyProtocol protocol) throws Exception {
+		final NettyServer server = new NettyServer(config);
+
+		try {
+			server.init(protocol);
+		}
+		catch (Exception e) {
+			if (server != null) {
+				server.shutdown();
+			}
+
+			throw e;
+		}
+
+		return server;
+	}
+
+	static NettyClient initClient(NettyConfig config, NettyProtocol protocol) throws Exception {
+		final NettyClient client = new NettyClient(config);
+
+		try {
+			client.init(protocol);
+		}
+		catch (Exception e) {
+			if (client != null) {
+				client.shutdown();
+			}
+
+			throw e;
+		}
+
+		return client;
+	}
+
+	static NettyServerAndClient initServerAndClient(NettyProtocol protocol) throws Exception {
+		return initServerAndClient(protocol, createConfig());
+	}
+
+	static NettyServerAndClient initServerAndClient(NettyProtocol protocol, NettyConfig config)
+			throws Exception {
+
+		final NettyClient client = initClient(config, protocol);
+		final NettyServer server = initServer(config, protocol);
+
+		return new NettyServerAndClient(server, client);
+	}
+
+	static Channel connect(NettyServerAndClient serverAndClient) throws Exception {
+		return connect(serverAndClient.client(), serverAndClient.server());
+	}
+
+	static Channel connect(NettyClient client, NettyServer server) throws Exception {
+		final NettyConfig config = server.getConfig();
+
+		return client
+				.connect(new InetSocketAddress(config.getServerAddress(), config.getServerPort()))
+				.sync()
+				.channel();
+	}
+
+	static void awaitClose(Channel ch) throws InterruptedException {
+		// Wait for the channel to be closed
+		while (ch.isActive()) {
+			ch.closeFuture().await(1, TimeUnit.SECONDS);
+		}
+	}
+
+	static void shutdown(NettyServerAndClient serverAndClient) {
+		if (serverAndClient != null) {
+			if (serverAndClient.server() != null) {
+				serverAndClient.server().shutdown();
+			}
+
+			if (serverAndClient.client() != null) {
+				serverAndClient.client().shutdown();
+			}
+		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// NettyConfig
+	// ---------------------------------------------------------------------------------------------
+
+	static NettyConfig createConfig() throws Exception {
+		return createConfig(DEFAULT_SEGMENT_SIZE, new Configuration());
+	}
+
+	static NettyConfig createConfig(int segmentSize) throws Exception {
+		return createConfig(segmentSize, new Configuration());
+	}
+
+	static NettyConfig createConfig(Configuration config) throws Exception {
+		return createConfig(DEFAULT_SEGMENT_SIZE, config);
+	}
+
+	static NettyConfig createConfig(int segmentSize, Configuration config) throws Exception {
+		checkArgument(segmentSize > 0);
+		checkNotNull(config);
+
+		return new NettyConfig(
+				InetAddress.getLocalHost(),
+				NetUtils.getAvailablePort(),
+				segmentSize,
+				config);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	static class NettyServerAndClient extends Tuple2<NettyServer, NettyClient> {
+
+		private static final long serialVersionUID = 4440278728496341931L;
+
+		NettyServerAndClient(NettyServer _1, NettyClient _2) {
+			super(_1, _2);
+		}
+
+		NettyServer server() {
+			return _1();
+		}
+
+		NettyClient client() {
+			return _2();
+		}
+
+		@Override
+		public boolean canEqual(Object that) {
+			return false;
+		}
+
+		@Override
+		public boolean equals(Object that) {
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 3e7d3a3..24a2a5c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -54,12 +54,14 @@ public class PartitionRequestClientFactoryTest {
 		final Tuple2<NettyServer, NettyClient> netty = createNettyServerAndClient(
 				new NettyProtocol() {
 					@Override
-					public void setServerChannelPipeline(ChannelPipeline channelPipeline) {
+					public ChannelHandler[] getServerChannelHandlers() {
+						return new ChannelHandler[0];
 					}
 
 					@Override
-					public void setClientChannelPipeline(ChannelPipeline channelPipeline) {
-						channelPipeline.addLast(new CountDownLatchOnConnectHandler(syncOnConnect));
+					public ChannelHandler[] getClientChannelHandlers() {
+						return new ChannelHandler[] {
+								new CountDownLatchOnConnectHandler(syncOnConnect)};
 					}
 				});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 3632d6c..b8e9f25 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
@@ -130,7 +131,13 @@ public class PartitionRequestClientHandlerTest {
 		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
 		client.addInputChannel(inputChannel);
 
-		client.channelRead(mock(ChannelHandlerContext.class), partitionNotFound);
+		// Mock channel context
+		ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+		when(ctx.channel()).thenReturn(mock(Channel.class));
+
+		client.channelActive(ctx);
+
+		client.channelRead(ctx, partitionNotFound);
 
 		verify(inputChannel, times(1)).onFailedPartitionRequest();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a65b622/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
new file mode 100644
index 0000000..6978225
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
+import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
+import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
+import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
+import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ServerTransportErrorHandlingTest {
+
+	/**
+	 * Verifies remote closes trigger the release of all resources.
+	 */
+	@Test
+	public void testRemoteClose() throws Exception {
+		final TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
+
+		final CountDownLatch sync = new CountDownLatch(1);
+
+		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+		when(partitionManager
+				.createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferProvider.class)))
+				.thenReturn(new InfiniteSubpartitionView(outboundBuffers, sync));
+
+		NettyProtocol protocol = new NettyProtocol() {
+			@Override
+			public ChannelHandler[] getServerChannelHandlers() {
+				return new PartitionRequestProtocol(
+						partitionManager,
+						mock(TaskEventDispatcher.class),
+						mock(NetworkBufferPool.class)).getServerChannelHandlers();
+			}
+
+			@Override
+			public ChannelHandler[] getClientChannelHandlers() {
+				return new ChannelHandler[] {
+						new NettyMessageEncoder(),
+						// Close on read
+						new ChannelInboundHandlerAdapter() {
+							@Override
+							public void channelRead(ChannelHandlerContext ctx, Object msg)
+									throws Exception {
+
+								ctx.channel().close();
+							}
+						}
+				};
+			}
+		};
+
+		NettyServerAndClient serverAndClient = null;
+
+		try {
+			serverAndClient = initServerAndClient(protocol, createConfig());
+
+			Channel ch = connect(serverAndClient);
+
+			// Write something to trigger close by server
+			ch.writeAndFlush(new PartitionRequest(new ResultPartitionID(), 0, new InputChannelID()));
+
+			// Wait for the notification
+			if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
+				fail("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() +
+						" ms to be notified about released partition.");
+			}
+		}
+		finally {
+			shutdown(serverAndClient);
+		}
+	}
+}