You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/05/22 13:58:42 UTC

[2/2] flink git commit: [FLINK-1954] [FLINK-1636] [runtime] Improve partition not found error handling

[FLINK-1954] [FLINK-1636] [runtime] Improve partition not found error handling

Problem: cancelling of tasks sometimes leads to misleading error messages about
"not found partitions". This is an artifact of task cancelling. If a task
(consumer) consumes data from another remote task (producer), its sends a
partition request over the network. If the producer fails concurrently with this
request, the request returns with a PartitioNotFoundException to the consumer.
If this error message is received *before* the consumer is cancelled (as a
result of the failing producer), you see the misleading error being attributed
to the consumer. This makes it hard to trace the root cause of the problem (the
failing producer).

Solution: when a consumer receives a remote PartitionNotFoundException, it asks
the central job manager whether the producer is still running or has failed.

If the producer is still running, the partition request is send again (using an
exponential back off). If the following requests fail again, the consumer fails
with a PartitionNotFoundException.

If the producer has failed, the consumer is cancelled.

If the producer is not running and has not failed, there is a bug either in the
consumer task setup (e.g. requesting a non-existing result) or in the network
stack (e.g. unsafe publication of produced results), in which case the error is
attributed to the consumer.

---

The new Akka messages introduced with this change are only exchanged in error
cases and don't affect normal operation.

Normal operation (not affected by this change):
	- TM1=>TM2: request result
	- TM2=>TM1: result

Error case:
	- TM1=>TM2: request result
	- TM2=>TM1: PartitionNotFoundException
	- TM1=>JM: check partition state
	- JM=>TM1: retrigger request -OR- cancel consumer

This closes #705.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1aad5b75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1aad5b75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1aad5b75

Branch: refs/heads/master
Commit: 1aad5b759432f0b59a9dcc366a4b66c2681626f1
Parents: 1e57475
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon May 11 16:34:55 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri May 22 13:57:54 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   1 -
 .../flink/runtime/executiongraph/Execution.java |   5 +-
 .../runtime/executiongraph/ExecutionVertex.java |   4 +
 .../runtime/io/network/NetworkEnvironment.java  |  55 +++++-
 .../runtime/io/network/netty/NettyConfig.java   |   2 +-
 .../runtime/io/network/netty/NettyMessage.java  |   2 +-
 .../network/netty/PartitionRequestClient.java   |  51 ++++--
 .../netty/PartitionRequestClientHandler.java    |  13 +-
 .../netty/PartitionRequestServerHandler.java    |  26 +--
 .../io/network/netty/PartitionStateChecker.java |  34 ++++
 .../partition/PartitionNotFoundException.java   |  41 +++++
 .../partition/PipelinedSubpartition.java        |   5 +-
 .../io/network/partition/ResultPartition.java   |   3 +
 .../partition/ResultPartitionManager.java       |   4 +-
 .../network/partition/consumer/InputGate.java   |  12 +-
 .../partition/consumer/RemoteInputChannel.java  |  92 ++++++++--
 .../partition/consumer/SingleInputGate.java     |  80 +++++++--
 .../partition/consumer/UnknownInputChannel.java |  16 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  51 +++++-
 .../flink/runtime/jobmanager/JobManager.scala   |  23 ++-
 .../runtime/messages/JobManagerMessages.scala   |  17 +-
 .../flink/runtime/messages/TaskMessages.scala   |  17 +-
 .../NetworkEnvironmentConfiguration.scala       |  10 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  15 +-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../PartitionRequestClientHandlerTest.java      |  33 +++-
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../consumer/RemoteInputChannelTest.java        | 139 ++++++++++++++-
 .../partition/consumer/SingleInputGateTest.java |   9 +-
 .../partition/consumer/TestSingleInputGate.java |   6 +-
 .../partition/consumer/UnionInputGateTest.java  |   8 +-
 .../runtime/jobmanager/JobManagerTest.java      | 155 +++++++++++++++-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../runtime/taskmanager/TaskManagerTest.java    | 176 +++++++++++++++++--
 .../flink/runtime/taskmanager/TaskTest.java     |  83 ++++++++-
 .../test/classloading/ClassLoaderITCase.java    |   6 +-
 .../exampleJavaPrograms/WordCountITCase.java    |   1 -
 37 files changed, 1076 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 956943a..26d4fbe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -41,7 +41,6 @@ public final class ConfigConstants {
 	@Deprecated
 	public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default";
 
-
 	/**
 	 * Config parameter for the number of re-tries for failed tasks. Setting this
 	 * value to 0 effectively disables fault tolerance.

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a3c82c2..eae696c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,8 +46,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import scala.concurrent.Future;
@@ -71,7 +71,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-
 import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
@@ -434,7 +433,7 @@ public class Execution implements Serializable {
 	}
 
 	void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
-		if (allConsumers.size() != 1) {
+		if (allConsumers.size() > 1) {
 			fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 2ad3a55..d44cb6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -219,6 +219,10 @@ public class ExecutionVertex implements Serializable {
 		return this.jobVertex.getGraph();
 	}
 
+	public Map<IntermediateResultPartitionID, IntermediateResultPartition> getProducedPartitions() {
+		return resultPartitions;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Graph building
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 259ea55..079a7f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -31,17 +31,21 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.Tuple2;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -80,8 +84,9 @@ public class NetworkEnvironment {
 
 	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
-	private boolean isShutdown;
+	private PartitionStateChecker partitionStateChecker;
 
+	private boolean isShutdown;
 
 	/**
 	 * Initializes all network I/O components.
@@ -130,6 +135,14 @@ public class NetworkEnvironment {
 		return partitionConsumableNotifier;
 	}
 
+	public PartitionStateChecker getPartitionStateChecker() {
+		return partitionStateChecker;
+	}
+
+	public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
+		return configuration.partitionRequestInitialAndMaxBackoff();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Association / Disassociation with JobManager / TaskManager
 	// --------------------------------------------------------------------------------------------
@@ -171,6 +184,9 @@ public class NetworkEnvironment {
 				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
 													jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout));
 
+				this.partitionStateChecker = new JobManagerPartitionStateChecker(
+						jobManagerRef, taskManagerRef);
+
 				// -----  Network connections  -----
 				final Option<NettyConfig> nettyConfig = configuration.nettyConfig();
 				connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get())
@@ -225,6 +241,8 @@ public class NetworkEnvironment {
 
 			partitionConsumableNotifier = null;
 
+			partitionStateChecker = null;
+
 			if (taskEventDispatcher != null) {
 				taskEventDispatcher.clearAll();
 				taskEventDispatcher = null;
@@ -235,8 +253,6 @@ public class NetworkEnvironment {
 		}
 	}
 
-
-
 	// --------------------------------------------------------------------------------------------
 	//  Task operations
 	// --------------------------------------------------------------------------------------------
@@ -404,9 +420,9 @@ public class NetworkEnvironment {
 
 		private final Timeout jobManagerMessageTimeout;
 
-		public JobManagerResultPartitionConsumableNotifier(ActorRef jobManager,
-															ActorRef taskManager,
-															Timeout jobManagerMessageTimeout) {
+		public JobManagerResultPartitionConsumableNotifier(
+				ActorRef jobManager, ActorRef taskManager, Timeout jobManagerMessageTimeout) {
+
 			this.jobManager = jobManager;
 			this.taskManager = taskManager;
 			this.jobManagerMessageTimeout = jobManagerMessageTimeout;
@@ -435,4 +451,29 @@ public class NetworkEnvironment {
 			}, AkkaUtils.globalExecutionContext());
 		}
 	}
+
+	private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
+
+		private final ActorRef jobManager;
+
+		private final ActorRef taskManager;
+
+		public JobManagerPartitionStateChecker(ActorRef jobManager, ActorRef taskManager) {
+			this.jobManager = jobManager;
+			this.taskManager = taskManager;
+		}
+
+		@Override
+		public void triggerPartitionStateCheck(
+				JobID jobId,
+				ExecutionAttemptID executionAttemptID,
+				IntermediateDataSetID resultId,
+				ResultPartitionID partitionId) {
+
+			RequestPartitionState msg = new RequestPartitionState(
+					jobId, partitionId, executionAttemptID, resultId);
+
+			jobManager.tell(msg, taskManager);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 55057fe..9d4f078 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -47,7 +47,7 @@ public class NettyConfig {
 
 	// ------------------------------------------------------------------------
 
-	static enum TransportType {
+	enum TransportType {
 		NIO, EPOLL, AUTO
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index bef2740..0606f4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -394,7 +394,7 @@ abstract class NettyMessage {
 
 		@Override
 		public String toString() {
-			return String.format("PartitionRequest(%s)", partitionId);
+			return String.format("PartitionRequest(%s:%d)", partitionId, queueIndex);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index b5f89e0..6d7725f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -26,8 +26,11 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
@@ -41,6 +44,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyMessage.TaskEventRe
  */
 public class PartitionRequestClient {
 
+	private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class);
+
 	private final Channel tcpChannel;
 
 	private final PartitionRequestClientHandler partitionRequestHandler;
@@ -79,21 +84,41 @@ public class PartitionRequestClient {
 	 * The request goes to the remote producer, for which this partition
 	 * request client instance has been created.
 	 */
-	public void requestSubpartition(final ResultPartitionID partitionId, int requestedQueueIndex, final RemoteInputChannel inputChannel) throws IOException {
+	public void requestSubpartition(
+			final ResultPartitionID partitionId,
+			final int subpartitionIndex,
+			final RemoteInputChannel inputChannel,
+			int delayMs) throws IOException {
+
+		LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
+				subpartitionIndex, partitionId, delayMs);
+
 		partitionRequestHandler.addInputChannel(inputChannel);
 
-		tcpChannel.writeAndFlush(new PartitionRequest(partitionId, requestedQueueIndex, inputChannel.getInputChannelId()))
-				.addListener(
-						new ChannelFutureListener() {
-							@Override
-							public void operationComplete(ChannelFuture future) throws Exception {
-								if (!future.isSuccess()) {
-									partitionRequestHandler.removeInputChannel(inputChannel);
-									inputChannel.onError(future.cause());
-								}
-							}
-						}
-				);
+		final PartitionRequest request = new PartitionRequest(
+				partitionId, subpartitionIndex, inputChannel.getInputChannelId());
+
+		final ChannelFutureListener listener = new ChannelFutureListener() {
+			@Override
+			public void operationComplete(ChannelFuture future) throws Exception {
+				if (!future.isSuccess()) {
+					partitionRequestHandler.removeInputChannel(inputChannel);
+					inputChannel.onError(future.cause());
+				}
+			}
+		};
+
+		if (delayMs == 0) {
+			tcpChannel.writeAndFlush(request).addListener(listener);
+		}
+		else {
+			tcpChannel.eventLoop().schedule(new Runnable() {
+				@Override
+				public void run() {
+					tcpChannel.writeAndFlush(request).addListener(listener);
+				}
+			}, delayMs, TimeUnit.MILLISECONDS);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 382f385..c3e65b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -67,7 +68,9 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	void addInputChannel(RemoteInputChannel listener) {
 		checkState(!channelError.get(), "There has been an error in the channel.");
 
-		inputChannels.put(listener.getInputChannelId(), listener);
+		if (!inputChannels.containsKey(listener.getInputChannelId())) {
+			inputChannels.put(listener.getInputChannelId(), listener);
+		}
 	}
 
 	void removeInputChannel(RemoteInputChannel listener) {
@@ -166,6 +169,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 		else if (msgClazz == NettyMessage.ErrorResponse.class) {
 			NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;
 
+
 			if (error.isFatalError()) {
 				notifyAllChannelsOfErrorAndClose(error.error);
 			}
@@ -173,7 +177,12 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);
 
 				if (inputChannel != null) {
-					inputChannel.onError(error.error);
+					if (error.error.getClass() == PartitionNotFoundException.class) {
+						inputChannel.onFailedPartitionRequest();
+					}
+					else {
+						inputChannel.onError(error.error);
+					}
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 6f4becd..7fa37e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -32,6 +33,9 @@ import org.slf4j.LoggerFactory;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.TaskEventRequest;
 
+/**
+ * Channel handler to initiate data transfers and dispatch backwards flowing task events.
+ */
 class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestServerHandler.class);
@@ -85,19 +89,19 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 			if (msgClazz == PartitionRequest.class) {
 				PartitionRequest request = (PartitionRequest) msg;
 
-				LOG.debug("Read channel on {}: {}.",ctx.channel().localAddress(), request);
+				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
 
-				ResultSubpartitionView queueIterator =
-						partitionProvider.createSubpartitionView(
-								request.partitionId,
-								request.queueIndex,
-								bufferPool);
+				try {
+					ResultSubpartitionView subpartition =
+							partitionProvider.createSubpartitionView(
+									request.partitionId,
+									request.queueIndex,
+									bufferPool);
 
-				if (queueIterator != null) {
-					outboundQueue.enqueue(queueIterator, request.receiverId);
+					outboundQueue.enqueue(subpartition, request.receiverId);
 				}
-				else {
-					respondWithError(ctx, new IllegalArgumentException("Partition not found"), request.receiverId);
+				catch (PartitionNotFoundException notFound) {
+					respondWithError(ctx, notFound, request.receiverId);
 				}
 			}
 			// ----------------------------------------------------------------
@@ -124,6 +128,8 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 	}
 
 	private void respondWithError(ChannelHandlerContext ctx, Throwable error, InputChannelID sourceId) {
+		LOG.debug("Responding with error {}.", error);
+
 		ctx.writeAndFlush(new NettyMessage.ErrorResponse(error, sourceId));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
new file mode 100644
index 0000000..ecbcdaa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+public interface PartitionStateChecker {
+
+	void triggerPartitionStateCheck(
+			JobID jobId,
+			ExecutionAttemptID executionId,
+			IntermediateDataSetID resultId,
+			ResultPartitionID partitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
new file mode 100644
index 0000000..7479862
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionNotFoundException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.IOException;
+
+public class PartitionNotFoundException extends IOException {
+
+	private static final long serialVersionUID = 0L;
+
+	private final ResultPartitionID partitionId;
+
+	public PartitionNotFoundException(ResultPartitionID partitionId) {
+		this.partitionId = partitionId;
+	}
+
+	public ResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	@Override
+	public String getMessage() {
+		return "Partition " + partitionId + " not found.";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 4c8174a..931790a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -170,13 +170,14 @@ class PipelinedSubpartition extends ResultSubpartition {
 	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
 		synchronized (buffers) {
 			if (readView != null) {
-				throw new IllegalStateException("Subpartition is being or already has been " +
+				throw new IllegalStateException("Subpartition " + index + " of "
+						+ parent.getPartitionId() + " is being or already has been " +
 						"consumed, but pipelined subpartitions can only be consumed once.");
 			}
 
 			readView = new PipelinedSubpartitionView(this);
 
-			LOG.debug("Created {}.", readView);
+			LOG.debug("Created read view for subpartition {} of partition {}.", index, parent.getPartitionId());
 
 			return readView;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index df1f254..bd9499e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -311,6 +312,8 @@ public class ResultPartition implements BufferPoolOwner {
 		checkState(refCnt != -1, "Partition released.");
 		checkState(refCnt > 0, "Partition not pinned.");
 
+		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
+
 		return subpartitions[index].createReadView(bufferProvider);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index a666208..5f25a4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -73,10 +73,10 @@ public class ResultPartitionManager implements ResultPartitionProvider {
 					partitionId.getPartitionId());
 
 			if (partition == null) {
-				throw new IOException("Unknown partition " + partitionId + ".");
+				throw new PartitionNotFoundException(partitionId);
 			}
 
-			LOG.debug("Requested partition {}.", partition);
+			LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);
 
 			return partition.createSubpartitionView(subpartitionIndex, bufferProvider);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 43cdd29..af089fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -25,16 +25,16 @@ import java.io.IOException;
 
 public interface InputGate {
 
-	public int getNumberOfInputChannels();
+	int getNumberOfInputChannels();
 
-	public boolean isFinished();
+	boolean isFinished();
 
-	public void requestPartitions() throws IOException, InterruptedException;
+	void requestPartitions() throws IOException, InterruptedException;
 
-	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
+	BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
 
-	public void sendTaskEvent(TaskEvent event) throws IOException;
+	void sendTaskEvent(TaskEvent event) throws IOException;
 
-	public void registerListener(EventListener<InputGate> listener);
+	void registerListener(EventListener<InputGate> listener);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index df653a4..449b1cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -24,15 +24,19 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -53,6 +57,12 @@ public class RemoteInputChannel extends InputChannel {
 	private final ConnectionManager connectionManager;
 
 	/**
+	 * An asynchronous error notification. Set by either the network I/O thread or the thread
+	 * failing a partition request.
+	 */
+	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+	/**
 	 * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
 	 * is consumed by the receiving task thread.
 	 */
@@ -65,7 +75,7 @@ public class RemoteInputChannel extends InputChannel {
 	private final AtomicBoolean isReleased = new AtomicBoolean();
 
 	/** Client to establish a (possibly shared) TCP connection and request the partition. */
-	private PartitionRequestClient partitionRequestClient;
+	private volatile PartitionRequestClient partitionRequestClient;
 
 	/**
 	 * The next expected sequence number for the next buffer. This is modified by the network
@@ -73,10 +83,11 @@ public class RemoteInputChannel extends InputChannel {
 	 */
 	private int expectedSequenceNumber = 0;
 
-	/**
-	 * An error possibly set by the network I/O thread.
-	 */
-	private volatile Throwable error;
+	/** The current backoff time (in ms) for partition requests. */
+	private int nextRequestBackoffMs;
+
+	/** The maximum backoff time (in ms) after which a request fails */
+	private final int maxRequestBackoffMs;
 
 	RemoteInputChannel(
 			SingleInputGate inputGate,
@@ -85,27 +96,67 @@ public class RemoteInputChannel extends InputChannel {
 			ConnectionID connectionId,
 			ConnectionManager connectionManager) {
 
+		this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
+				new Tuple2<Integer, Integer>(0, 0));
+	}
+
+	RemoteInputChannel(
+			SingleInputGate inputGate,
+			int channelIndex,
+			ResultPartitionID partitionId,
+			ConnectionID connectionId,
+			ConnectionManager connectionManager,
+			Tuple2<Integer, Integer> initialAndMaxBackoff) {
+
 		super(inputGate, channelIndex, partitionId);
 
 		this.connectionId = checkNotNull(connectionId);
 		this.connectionManager = checkNotNull(connectionManager);
+
+		checkArgument(initialAndMaxBackoff._1() <= initialAndMaxBackoff._2());
+
+		this.nextRequestBackoffMs = initialAndMaxBackoff._1();
+		this.maxRequestBackoffMs = initialAndMaxBackoff._2();
 	}
 
 	// ------------------------------------------------------------------------
 	// Consume
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Requests a remote subpartition.
+	 */
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
 		if (partitionRequestClient == null) {
-			LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}.",
-					this, subpartitionIndex, partitionId);
-
 			// Create a client and request the partition
 			partitionRequestClient = connectionManager
 					.createPartitionRequestClient(connectionId);
 
-			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this);
+			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
+		}
+	}
+
+	/**
+	 * Retriggers a remote subpartition request.
+	 */
+	void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
+		checkState(partitionRequestClient != null, "Missing initial subpartition request.");
+
+		// Disabled
+		if (nextRequestBackoffMs == 0) {
+			failPartitionRequest();
+		}
+		else if (nextRequestBackoffMs <= maxRequestBackoffMs) {
+			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, nextRequestBackoffMs);
+
+			// Exponential backoff
+			nextRequestBackoffMs = nextRequestBackoffMs < maxRequestBackoffMs
+					? Math.min(nextRequestBackoffMs * 2, maxRequestBackoffMs)
+					: maxRequestBackoffMs + 1; // Fail the next request
+		}
+		else {
+			failPartitionRequest();
 		}
 	}
 
@@ -178,6 +229,10 @@ public class RemoteInputChannel extends InputChannel {
 		}
 	}
 
+	public void failPartitionRequest() {
+		onError(new PartitionNotFoundException(partitionId));
+	}
+
 	@Override
 	public String toString() {
 		return "RemoteInputChannel [" + partitionId + " at " + connectionId + "]";
@@ -245,23 +300,30 @@ public class RemoteInputChannel extends InputChannel {
 		}
 	}
 
-	public void onError(Throwable cause) {
-		if (error == null) {
-			error = cause;
+	public void onFailedPartitionRequest() {
+		inputGate.triggerPartitionStateCheck(partitionId);
+	}
 
+	public void onError(Throwable cause) {
+		if (error.compareAndSet(null, cause)) {
 			// Notify the input gate to trigger querying of this channel
 			notifyAvailableBuffer();
 		}
 	}
 
 	/**
-	 * Checks whether this channel got notified by the network I/O thread about an error.
+	 * Checks whether this channel got notified about an error.
 	 */
 	private void checkError() throws IOException {
-		final Throwable t = error;
+		final Throwable t = error.get();
 
 		if (t != null) {
-			throw new IOException(t);
+			if (t instanceof IOException) {
+				throw (IOException) t;
+			}
+			else {
+				throw new IOException(t);
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index acda1d8..b4a0845 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,17 +19,20 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -103,6 +106,12 @@ public class SingleInputGate implements InputGate {
 	/** The name of the owning task, for logging purposes. */
 	private final String owningTaskName;
 
+	/** The job ID of the owning task. */
+	private final JobID jobId;
+
+	/** The execution attempt ID of the owning task. */
+	private final ExecutionAttemptID executionId;
+
 	/**
 	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 	 * intermediate result specified by this ID. This ID also identifies the input gate at the
@@ -130,6 +139,9 @@ public class SingleInputGate implements InputGate {
 
 	private final BitSet channelsWithEndOfPartitionEvents;
 
+	/** The partition state checker to use for failed partition requests. */
+	private final PartitionStateChecker partitionStateChecker;
+
 	/**
 	 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
 	 * from this pool.
@@ -153,11 +165,17 @@ public class SingleInputGate implements InputGate {
 
 	public SingleInputGate(
 			String owningTaskName,
+			JobID jobId,
+			ExecutionAttemptID executionId,
 			IntermediateDataSetID consumedResultId,
 			int consumedSubpartitionIndex,
-			int numberOfInputChannels) {
+			int numberOfInputChannels,
+			PartitionStateChecker partitionStateChecker) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
+		this.jobId = checkNotNull(jobId);
+		this.executionId = checkNotNull(executionId);
+
 		this.consumedResultId = checkNotNull(consumedResultId);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
@@ -168,6 +186,8 @@ public class SingleInputGate implements InputGate {
 
 		this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
 		this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
+
+		this.partitionStateChecker = checkNotNull(partitionStateChecker);
 	}
 
 	// ------------------------------------------------------------------------
@@ -260,6 +280,30 @@ public class SingleInputGate implements InputGate {
 		}
 	}
 
+	/**
+	 * Retriggers a partition request.
+	 */
+	public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException, InterruptedException {
+		synchronized (requestLock) {
+			if (!isReleased) {
+				final InputChannel ch = inputChannels.get(partitionId);
+
+				checkNotNull(ch, "Unknown input channel with ID " + partitionId);
+
+				if (ch.getClass() != RemoteInputChannel.class) {
+					throw new IllegalArgumentException("Channel identified by " + partitionId
+							+ " is not a remote channel.");
+				}
+
+				final RemoteInputChannel rch = (RemoteInputChannel) ch;
+
+				LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex);
+
+				rch.retriggerSubpartitionRequest(consumedSubpartitionIndex);
+			}
+		}
+	}
+
 	public void releaseAllResources() throws IOException {
 		synchronized (requestLock) {
 			if (!isReleased) {
@@ -303,14 +347,14 @@ public class SingleInputGate implements InputGate {
 
 	@Override
 	public void requestPartitions() throws IOException, InterruptedException {
-		if (!requestedPartitionsFlag) {
-			// Sanity check
-			if (numberOfInputChannels != inputChannels.size()) {
-				throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
-						"number of total input channels and the currently set number of input " +
-						"channels.");
-			}
+		// Sanity check
+		if (numberOfInputChannels != inputChannels.size()) {
+			throw new IllegalStateException("Bug in input gate setup logic: mismatch between" +
+					"number of total input channels and the currently set number of input " +
+					"channels.");
+		}
 
+		if (!requestedPartitionsFlag) {
 			synchronized (requestLock) {
 				for (InputChannel inputChannel : inputChannels.values()) {
 					inputChannel.requestSubpartition(consumedSubpartitionIndex);
@@ -403,6 +447,14 @@ public class SingleInputGate implements InputGate {
 		}
 	}
 
+	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
+		partitionStateChecker.triggerPartitionStateCheck(
+				jobId,
+				executionId,
+				consumedResultId,
+				partitionId);
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -410,6 +462,8 @@ public class SingleInputGate implements InputGate {
 	 */
 	public static SingleInputGate create(
 			String owningTaskName,
+			JobID jobId,
+			ExecutionAttemptID executionId,
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment) {
 
@@ -421,7 +475,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-				owningTaskName, consumedResultId, consumedSubpartitionIndex, icdd.length);
+				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker());
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
@@ -439,13 +493,17 @@ public class SingleInputGate implements InputGate {
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
 						partitionLocation.getConnectionId(),
-						networkEnvironment.getConnectionManager());
+						networkEnvironment.getConnectionManager(),
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+				);
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
 						networkEnvironment.getPartitionManager(),
 						networkEnvironment.getTaskEventDispatcher(),
-						networkEnvironment.getConnectionManager());
+						networkEnvironment.getConnectionManager(),
+						networkEnvironment.getPartitionRequestInitialAndMaxBackoff()
+				);
 			}
 			else {
 				throw new IllegalStateException("Unexpected partition location.");

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 4bde292..0aa7ea3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -43,19 +44,24 @@ public class UnknownInputChannel extends InputChannel {
 
 	private final ConnectionManager connectionManager;
 
+	/** Initial and maximum backoff (in ms) after failed partition requests. */
+	private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff;
+
 	public UnknownInputChannel(
 			SingleInputGate gate,
 			int channelIndex,
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
-			ConnectionManager connectionManager) {
+			ConnectionManager connectionManager,
+			Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff) {
 
 		super(gate, channelIndex, partitionId);
 
-		this.partitionManager = partitionManager;
-		this.taskEventDispatcher = taskEventDispatcher;
-		this.connectionManager = connectionManager;
+		this.partitionManager = checkNotNull(partitionManager);
+		this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+		this.connectionManager = checkNotNull(connectionManager);
+		this.partitionRequestInitialAndMaxBackoff = checkNotNull(partitionRequestInitialAndMaxBackoff);
 	}
 
 	@Override
@@ -106,7 +112,7 @@ public class UnknownInputChannel extends InputChannel {
 	// ------------------------------------------------------------------------
 
 	public RemoteInputChannel toRemoteInputChannel(ConnectionID producerAddress) {
-		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager);
+		return new RemoteInputChannel(inputGate, channelIndex, partitionId, checkNotNull(producerAddress), connectionManager, partitionRequestInitialAndMaxBackoff);
 	}
 
 	public LocalInputChannel toLocalInputChannel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8f613fc..51ce91f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
 import akka.util.Timeout;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
@@ -43,23 +42,24 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
-import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
 import org.apache.flink.runtime.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -181,7 +181,6 @@ public class Task implements Runnable {
 
 	/** The thread that executes the task */
 	private final Thread executingThread;
-	
 
 	// ------------------------------------------------------------------------
 	//  Fields that control the task execution. All these fields are volatile
@@ -204,7 +203,6 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
-	
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
 	 * be undone in the case of a failing task deployment.</p>
@@ -287,7 +285,7 @@ public class Task implements Runnable {
 
 		for (int i = 0; i < this.inputGates.length; i++) {
 			SingleInputGate gate = SingleInputGate.create(
-					taskNameWithSubtasksAndId, consumedPartitions.get(i), networkEnvironment);
+					taskNameWithSubtasksAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment);
 
 			this.inputGates[i] = gate;
 			inputGatesById.put(gate.getConsumedResultId(), gate);
@@ -401,6 +399,7 @@ public class Task implements Runnable {
 	/**
 	 * The core work method that bootstraps the task and executes it code
 	 */
+	@Override
 	public void run() {
 
 		// ----------------------------
@@ -913,12 +912,52 @@ public class Task implements Runnable {
 			LOG.debug("Ignoring checkpoint commit notification for non-running task.");
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Answer to a partition state check issued after a failed partition request.
+	 */
+	public void onPartitionStateUpdate(
+			IntermediateDataSetID resultId,
+			IntermediateResultPartitionID partitionId,
+			ExecutionState partitionState) throws IOException, InterruptedException {
+
+		if (executionState == ExecutionState.RUNNING) {
+			final SingleInputGate inputGate = inputGatesById.get(resultId);
+
+			if (inputGate != null) {
+				if (partitionState == ExecutionState.RUNNING) {
+					// Retrigger the partition request
+					inputGate.retriggerPartitionRequest(partitionId);
+				}
+				else if (partitionState == ExecutionState.CANCELED
+						|| partitionState == ExecutionState.CANCELING
+						|| partitionState == ExecutionState.FAILED) {
+
+					cancelExecution();
+				}
+				else {
+					failExternally(new IllegalStateException("Received unexpected partition state "
+							+ partitionState + " for partition request. This is a bug."));
+				}
+			}
+			else {
+				failExternally(new IllegalStateException("Received partition state for " +
+						"unknown input gate " + resultId + ". This is a bug."));
+			}
+		}
+		else {
+			LOG.debug("Ignoring partition state notification for not running task.");
+		}
+	}
 	
 	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
 		Thread thread = new Thread(runnable, callName);
 		thread.setDaemon(true);
 		thread.start();
 	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4745fb6..0c71938 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -30,12 +30,14 @@ import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
-import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
+import org.apache.flink.runtime.messages.TaskMessages
+import org.apache.flink.runtime.messages.TaskMessages.{FailTask, PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AcknowledgeCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.process.ProcessReaper
@@ -280,7 +282,7 @@ class JobManager(protected val flinkConfiguration: Configuration,
 
     case checkpointMessage : AbstractCheckpointMessage =>
       handleCheckpointMessage(checkpointMessage)
-      
+
     case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
@@ -338,6 +340,23 @@ class JobManager(protected val flinkConfiguration: Configuration,
             s"$jobId to schedule or update consumers."))
       }
 
+    case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
+      val state = currentJobs.get(jobId) match {
+        case Some((executionGraph, _)) =>
+          val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+
+          if (execution != null) execution.getState else null
+        case None =>
+          // Nothing to do. This is not an error, because the request is received when a sending
+          // task fails during a remote partition request.
+          log.debug(s"Cannot find execution graph for job $jobId.")
+
+          null
+      }
+
+      sender ! PartitionState(
+        taskExecutionId, taskResultId, partitionId.getPartitionId, state)
+
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) => sender ! CurrentJobStatus(jobID, executionGraph.getState)

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 03e837d..c9a2878 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusM
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
 
 import scala.collection.JavaConverters._
 
@@ -71,6 +71,21 @@ object JobManagerMessages {
   case class NextInputSplit(splitData: Array[Byte])
 
   /**
+   * Requests the current state of the partition.
+   *
+   * The state of a partition is currently bound to the state of the producing execution.
+   * 
+   * @param jobId The job ID of the job, which produces the partition.
+   * @param partitionId The partition ID of the partition to request the state of.
+   * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
+   * @param taskResultId The input gate ID of the task requesting the partition state.
+   */
+  case class RequestPartitionState(jobId: JobID,
+                                   partitionId: ResultPartitionID,
+                                   taskExecutionId: ExecutionAttemptID,
+                                   taskResultId: IntermediateDataSetID)
+
+  /**
    * Notifies the [[org.apache.flink.runtime.jobmanager.JobManager]] about available data for a
    * produced partition.
    * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
index b1a08ca..9373576 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -18,14 +18,16 @@
 
 package org.apache.flink.runtime.messages
 
-import org.apache.flink.runtime.deployment.{TaskDeploymentDescriptor, InputChannelDeploymentDescriptor}
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 /**
  * A set of messages that control the deployment and the state of Tasks executed
- * on the TaskManager
+ * on the TaskManager.
  */
 object TaskMessages {
 
@@ -81,6 +83,15 @@ object TaskMessages {
   // --------------------------------------------------------------------------
 
   /**
+   * Answer to a [[RequestPartitionState]] with the state of the respective partition.
+   */
+  case class PartitionState(
+    taskExecutionId: ExecutionAttemptID,
+    taskResultId: IntermediateDataSetID,
+    partitionId: IntermediateResultPartitionID,
+    state: ExecutionState) extends TaskMessage
+
+  /**
    * Base class for messages that update the information about location of input partitions
    */
   abstract sealed class UpdatePartitionInfo extends TaskMessage {

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index f99aac0..51ca90d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.taskmanager
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
-case class NetworkEnvironmentConfiguration(numNetworkBuffers: Int,
-                                           networkBufferSize: Int,
-                                           ioMode: IOMode,
-                                           nettyConfig: Option[NettyConfig] = None)
+case class NetworkEnvironmentConfiguration(
+  numNetworkBuffers: Int,
+  networkBufferSize: Int,
+  ioMode: IOMode,
+  nettyConfig: Option[NettyConfig] = None,
+  partitionRequestInitialAndMaxBackoff: Tuple2[Integer, Integer] = (50, 3000))

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 6a89624..65ce7dc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,10 +20,9 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
-import java.util
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
-import java.lang.management.{GarbageCollectorMXBean, ManagementFactory, MemoryMXBean}
+import java.lang.management.ManagementFactory
 
 import akka.actor._
 import akka.pattern.ask
@@ -393,6 +392,14 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           sender ! new TaskOperationResult(executionID, false,
               "No task with that execution ID was found.")
         }
+
+      case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
+        Option(runningTasks.get(taskExecutionId)) match {
+          case Some(task) =>
+            task.onPartitionStateUpdate(taskResultId, partitionId, state)
+          case None =>
+            log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
+        }
     }
   }
 
@@ -1560,8 +1567,8 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
-    val networkConfig = NetworkEnvironmentConfiguration(numNetworkBuffers, pageSize,
-                                                        ioMode, nettyConfig)
+    val networkConfig = NetworkEnvironmentConfiguration(
+      numNetworkBuffers, pageSize, ioMode, nettyConfig)
 
     // ----> timeouts, library caching, profiling
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index bb95f4b..7739dea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.junit.Test;
 import org.mockito.Mockito;
 import scala.Some;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -54,7 +55,8 @@ public class NetworkEnvironmentTest {
 		try {
 			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration());
 			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
-					NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf));
+					NUM_BUFFERS, BUFFER_SIZE, IOManager.IOMode.SYNC, new Some<NettyConfig>(nettyConf),
+					new Tuple2<Integer, Integer>(0, 0));
 
 			NetworkEnvironment env = new NetworkEnvironment(new FiniteDuration(30, TimeUnit.SECONDS), config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index c0e2bcb..3632d6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -24,6 +24,9 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
@@ -35,6 +38,7 @@ import java.io.IOException;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -96,7 +100,6 @@ public class PartitionRequestClientHandlerTest {
 				emptyBuffer, 0, inputChannel.getInputChannelId());
 
 		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
-
 		client.addInputChannel(inputChannel);
 
 		// Read the empty buffer
@@ -107,6 +110,34 @@ public class PartitionRequestClientHandlerTest {
 	}
 
 	/**
+	 * Verifies that {@link RemoteInputChannel#onFailedPartitionRequest()} is called when a
+	 * {@link PartitionNotFoundException} is received.
+	 */
+	@Test
+	public void testReceivePartitionNotFoundException() throws Exception {
+		// Minimal mock of a remote input channel
+		final BufferProvider bufferProvider = mock(BufferProvider.class);
+		when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+		final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
+		when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
+		when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
+
+		final ErrorResponse partitionNotFound = new ErrorResponse(
+				new PartitionNotFoundException(new ResultPartitionID()),
+				inputChannel.getInputChannelId());
+
+		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
+		client.addInputChannel(inputChannel);
+
+		client.channelRead(mock(ChannelHandlerContext.class), partitionNotFound);
+
+		verify(inputChannel, times(1)).onFailedPartitionRequest();
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
 	 * Returns a deserialized buffer message as it would be received during runtime.
 	 */
 	private BufferResponse createBufferResponse(

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index fdf41f0..9bef886 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -224,9 +226,12 @@ public class LocalInputChannelTest {
 
 			this.inputGate = new SingleInputGate(
 					"Test Name",
+					new JobID(),
+					new ExecutionAttemptID(),
 					new IntermediateDataSetID(),
 					subpartitionIndex,
-					numberOfInputChannels);
+					numberOfInputChannels,
+					mock(PartitionStateChecker.class));
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 76fae5b..22a11f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.List;
@@ -129,20 +130,154 @@ public class RemoteInputChannelTest {
 		}
 	}
 
+	@Test(expected = IllegalStateException.class)
+	public void testRetriggerWithoutPartitionRequest() throws Exception {
+		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
+		SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+
+		ch.retriggerSubpartitionRequest(0);
+	}
+
+	@Test
+	public void testPartitionRequestExponentialBackoff() throws Exception {
+		// Config
+		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
+
+		// Start with initial backoff, then keep doubling, and cap at max.
+		int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
+
+		// Setup
+		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
+		SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+
+		// Initial request
+		ch.requestSubpartition(0);
+		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(0));
+
+		// Request subpartition and verify that the actual requests are delayed.
+		for (int expected : expectedDelays) {
+			ch.retriggerSubpartitionRequest(0);
+
+			verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(expected));
+		}
+
+		// Exception after backoff is greater than the maximum backoff.
+		try {
+			ch.retriggerSubpartitionRequest(0);
+			ch.getNextBuffer();
+			fail("Did not throw expected exception.");
+		}
+		catch (Exception expected) {
+		}
+	}
+
+	@Test
+	public void testPartitionRequestSingleBackoff() throws Exception {
+		// Config
+		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 500);
+
+		// Setup
+		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
+		SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+
+		// No delay for first request
+		ch.requestSubpartition(0);
+		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(0));
+
+		// Initial delay for second request
+		ch.retriggerSubpartitionRequest(0);
+		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(backoff._1()));
+
+		// Exception after backoff is greater than the maximum backoff.
+		try {
+			ch.retriggerSubpartitionRequest(0);
+			ch.getNextBuffer();
+			fail("Did not throw expected exception.");
+		}
+		catch (Exception expected) {
+		}
+	}
+
+	@Test
+	public void testPartitionRequestNoBackoff() throws Exception {
+		// Config
+		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(0, 0);
+
+		// Setup
+		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
+		SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+
+		// No delay for first request
+		ch.requestSubpartition(0);
+		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(0));
+
+		// Exception, because backoff is disabled.
+		try {
+			ch.retriggerSubpartitionRequest(0);
+			ch.getNextBuffer();
+			fail("Did not throw expected exception.");
+		}
+		catch (Exception expected) {
+		}
+	}
+
+	@Test
+	public void testOnFailedPartitionRequest() throws Exception {
+		final ConnectionManager connectionManager = mock(ConnectionManager.class);
+		when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
+				.thenReturn(mock(PartitionRequestClient.class));
+
+		final ResultPartitionID partitionId = new ResultPartitionID();
+
+		final SingleInputGate inputGate = mock(SingleInputGate.class);
+
+		final RemoteInputChannel ch = new RemoteInputChannel(
+				inputGate,
+				0,
+				partitionId,
+				mock(ConnectionID.class),
+				connectionManager
+		);
+
+		ch.onFailedPartitionRequest();
+
+		verify(inputGate).triggerPartitionStateCheck(eq(partitionId));
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
 			throws IOException, InterruptedException {
 
+		return createRemoteInputChannel(
+				inputGate, mock(PartitionRequestClient.class), new Tuple2<Integer, Integer>(0, 0));
+	}
+
+	private RemoteInputChannel createRemoteInputChannel(
+			SingleInputGate inputGate,
+			PartitionRequestClient partitionRequestClient,
+			Tuple2<Integer, Integer> initialAndMaxRequestBackoff)
+			throws IOException, InterruptedException {
+
 		final ConnectionManager connectionManager = mock(ConnectionManager.class);
 		when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
-				.thenReturn(mock(PartitionRequestClient.class));
+				.thenReturn(partitionRequestClient);
 
 		return new RemoteInputChannel(
 				inputGate,
 				0,
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
-				connectionManager);
+				connectionManager,
+				initialAndMaxRequestBackoff);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 9a7ffe5..2454399 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -36,6 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.junit.Test;
+import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -59,7 +62,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new IntermediateDataSetID(), 0, 2);
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class));
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -105,7 +108,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", resultId, 0, 2);
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class));
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -119,7 +122,7 @@ public class SingleInputGateTest {
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class));
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0));
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 2dafaa2..c282519 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
@@ -29,6 +32,7 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 /**
@@ -48,7 +52,7 @@ public class TestSingleInputGate {
 		checkArgument(numberOfInputChannels >= 1);
 
 		this.inputGate = spy(new SingleInputGate(
-				"Test Task Name", new IntermediateDataSetID(), 0, numberOfInputChannels));
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class)));
 
 		this.inputChannels = new TestInputChannel[numberOfInputChannels];
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1aad5b75/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 050f43a..d8714d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
 import org.junit.Test;
@@ -25,6 +28,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class UnionInputGateTest {
 
@@ -39,8 +43,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new IntermediateDataSetID(), 0, 3);
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new IntermediateDataSetID(), 0, 5);
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class));
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class));
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});