You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/09 13:20:43 UTC

flink git commit: [FLINK-5114] [network] Handle partition producer state check for unregistered executions

Repository: flink
Updated Branches:
  refs/heads/release-1.1 3ae6e9e09 -> 2b612f2d8


[FLINK-5114] [network] Handle partition producer state check for unregistered executions

This closes #2975.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b612f2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b612f2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b612f2d

Branch: refs/heads/release-1.1
Commit: 2b612f2d8fa2493fa1d0d586bc0fe10afa9150ca
Parents: 3ae6e9e
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Dec 8 23:48:39 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Dec 9 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../executiongraph/IntermediateResult.java      |  33 +++
 .../runtime/io/network/NetworkEnvironment.java  |  37 +--
 .../netty/PartitionProducerStateChecker.java    |  49 ++++
 .../io/network/netty/PartitionStateChecker.java |  34 ---
 .../partition/consumer/SingleInputGate.java     |  10 +-
 .../PartitionProducerDisposedException.java     |  36 +++
 .../flink/runtime/jobmanager/JobManager.scala   |  74 ++++--
 .../runtime/messages/JobManagerMessages.scala   |  29 ++-
 .../runtime/messages/TaskControlMessages.scala  |  25 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  46 +++-
 .../partition/InputGateConcurrentTest.java      |  10 +-
 .../partition/InputGateFairnessTest.java        |  20 +-
 .../consumer/LocalInputChannelTest.java         |   6 +-
 .../partition/consumer/SingleInputGateTest.java |  12 +-
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   6 +-
 .../runtime/jobmanager/JobManagerTest.java      | 252 ++++++++++++++++---
 .../runtime/taskmanager/TaskManagerTest.java    |  22 +-
 18 files changed, 520 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 67b1fe0..6da2cd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
 
 	private final IntermediateResultPartition[] partitions;
 
+	/**
+	 * Maps intermediate result partition IDs to a partition index. This is
+	 * used for ID lookups of intermediate results. I didn't dare to change the
+	 * partition connect logic in other places that is tightly coupled to the
+	 * partitions being held as an array.
+	 */
+	private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
+
 	private final int numParallelProducers;
 
 	private final AtomicInteger numberOfRunningProducers;
@@ -80,6 +90,7 @@ public class IntermediateResult {
 		}
 
 		partitions[partitionNumber] = partition;
+		partitionLookupHelper.put(partition.getPartitionId(), partitionNumber);
 		partitionsAssigned++;
 	}
 
@@ -95,6 +106,28 @@ public class IntermediateResult {
 		return partitions;
 	}
 
+	/**
+	 * Returns the partition with the given ID.
+	 *
+	 * @param resultPartitionId ID of the partition to look up
+	 * @throws NullPointerException If partition ID <code>null</code>
+	 * @throws IllegalArgumentException Thrown if unknown partition ID
+	 * @return Intermediate result partition with the given ID
+	 */
+	public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+		// Looks ups the partition number via the helper map and returns the
+		// partition. Currently, this happens infrequently enough that we could
+		// consider removing the map and scanning the partitions on every lookup.
+		// The lookup (currently) only happen when the producer of an intermediate
+		// result cannot be found via its registered execution.
+		Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID"));
+		if (partitionNumber != null) {
+			return partitions[partitionNumber];
+		} else {
+			throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId);
+		}
+	}
+
 	public int getNumberOfAssignedPartitions() {
 		return partitionsAssigned;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d3715ed..7bac93b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -28,14 +28,14 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -82,7 +82,7 @@ public class NetworkEnvironment {
 
 	private ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
-	private PartitionStateChecker partitionStateChecker;
+	private PartitionProducerStateChecker partitionStateChecker;
 
 	private boolean isShutdown;
 
@@ -143,7 +143,7 @@ public class NetworkEnvironment {
 		return partitionConsumableNotifier;
 	}
 
-	public PartitionStateChecker getPartitionStateChecker() {
+	public PartitionProducerStateChecker getPartitionProducerStateChecker() {
 		return partitionStateChecker;
 	}
 
@@ -196,7 +196,7 @@ public class NetworkEnvironment {
 					taskManagerGateway,
 					jobManagerTimeout);
 
-				this.partitionStateChecker = new JobManagerPartitionStateChecker(
+				this.partitionStateChecker = new ActorGatewayPartitionProducerStateChecker(
 						jobManagerGateway, taskManagerGateway);
 
 				// -----  Network connections  -----
@@ -474,28 +474,31 @@ public class NetworkEnvironment {
 		}
 	}
 
-	private static class JobManagerPartitionStateChecker implements PartitionStateChecker {
+	private static class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker {
 
 		private final ActorGateway jobManager;
 
 		private final ActorGateway taskManager;
 
-		public JobManagerPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
+		ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
 			this.jobManager = jobManager;
 			this.taskManager = taskManager;
 		}
 
 		@Override
-		public void triggerPartitionStateCheck(
-				JobID jobId,
-				ExecutionAttemptID executionAttemptID,
-				IntermediateDataSetID resultId,
-				ResultPartitionID partitionId) {
-
-			RequestPartitionState msg = new RequestPartitionState(
-					jobId, partitionId, executionAttemptID, resultId);
-
-			jobManager.tell(msg, taskManager);
+		public void requestPartitionProducerState(
+			JobID jobId,
+			ExecutionAttemptID receiverExecutionId,
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId) {
+
+			jobManager.tell(
+				new RequestPartitionProducerState(
+					jobId,
+					receiverExecutionId,
+					intermediateDataSetId,
+					resultPartitionId),
+				taskManager);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..18d92b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+	/**
+	 * Requests the execution state of the execution producing a result partition.
+	 *
+	 * @param jobId ID of the job the partition belongs to.
+	 * @param receiverExecutionId The execution attempt ID of the task who triggers the request.
+	 * @param intermediateDataSetId ID of the parent intermediate data set.
+	 * @param resultPartitionId ID of the result partition to check. This
+	 */
+	void requestPartitionProducerState(
+		JobID jobId, ExecutionAttemptID receiverExecutionId,
+		IntermediateDataSetID intermediateDataSetId,
+		ResultPartitionID resultPartitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index ecbcdaa..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
-
-	void triggerPartitionStateCheck(
-			JobID jobId,
-			ExecutionAttemptID executionId,
-			IntermediateDataSetID resultId,
-			ResultPartitionID partitionId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1550b0d..67bc8d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -139,7 +139,7 @@ public class SingleInputGate implements InputGate {
 	private final BitSet channelsWithEndOfPartitionEvents;
 
 	/** The partition state checker to use for failed partition requests. */
-	private final PartitionStateChecker partitionStateChecker;
+	private final PartitionProducerStateChecker partitionStateChecker;
 
 	/**
 	 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
@@ -172,7 +172,7 @@ public class SingleInputGate implements InputGate {
 			IntermediateDataSetID consumedResultId,
 			int consumedSubpartitionIndex,
 			int numberOfInputChannels,
-			PartitionStateChecker partitionStateChecker,
+			PartitionProducerStateChecker partitionStateChecker,
 			IOMetricGroup metrics) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
@@ -510,7 +510,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-		partitionStateChecker.triggerPartitionStateCheck(
+		partitionStateChecker.requestPartitionProducerState(
 				jobId,
 				executionId,
 				consumedResultId,
@@ -567,7 +567,7 @@ public class SingleInputGate implements InputGate {
 
 		final SingleInputGate inputGate = new SingleInputGate(
 				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
-				icdd.length, networkEnvironment.getPartitionStateChecker(), metrics);
+				icdd.length, networkEnvironment.getPartitionProducerStateChecker(), metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+	public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) {
+		super(String.format("Execution %s producing partition %s has already been disposed.",
+			resultPartitionID.getProducerId(),
+			resultPartitionID.getPartitionId()));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2b455b7..d6d23d9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
-import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.messages.TaskMessages.{PartitionProducerState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
@@ -839,28 +839,68 @@ class JobManager(
           )
       }
 
-    case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
-      val state = currentJobs.get(jobId) match {
+    case RequestPartitionProducerState(
+        jobId,
+        receiverId,
+        intermediateDataSetId,
+        resultPartitionId) =>
+
+      currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+          try {
+            // Find the execution attempt producing the intermediate result partition.
+            val execution = executionGraph
+              .getRegisteredExecutions
+              .get(resultPartitionId.getProducerId)
+
+            if (execution != null) {
+              // Common case for pipelined exchanges => producing execution is
+              // still active.
+              val success = (intermediateDataSetId, resultPartitionId, execution.getState)
+              sender ! decorateMessage(PartitionProducerState(receiverId, Left(success)))
+            } else {
+              // The producing execution might have terminated and been
+              // unregistered. We now look for the producing execution via the
+              // intermediate result itself.
+              val intermediateResult = executionGraph
+                .getAllIntermediateResults.get(intermediateDataSetId)
+
+              if (intermediateResult != null) {
+                // Try to find the producing execution
+                val producerExecution = intermediateResult
+                  .getPartitionById(resultPartitionId.getPartitionId)
+                  .getProducer
+                  .getCurrentExecutionAttempt
+
+                if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+                  val success = (
+                    intermediateDataSetId,
+                    resultPartitionId,
+                    producerExecution.getState)
+                  sender ! decorateMessage(PartitionProducerState(receiverId, Left(success)))
+                } else {
+                  val failure = new PartitionProducerDisposedException(resultPartitionId)
+                  sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+                }
+              } else {
+                val failure = new IllegalArgumentException("Intermediate data set with ID" +
+                  s"$intermediateDataSetId not found.")
+                sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+              }
+            }
+          } catch {
+            case e: Exception =>
+              val failure = new RuntimeException("Failed to look up execution state of " +
+                s"producer with ID ${resultPartitionId.getProducerId}.", e)
+              sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
+          }
 
-          if (execution != null) execution.getState else null
         case None =>
-          // Nothing to do. This is not an error, because the request is received when a sending
-          // task fails or is not yet available during a remote partition request.
-          log.debug(s"Cannot find execution graph for job $jobId.")
+          val failure = new IllegalArgumentException(s"Job with ID $jobId not found.")
+          sender ! decorateMessage(PartitionProducerState(receiverId, Right(failure)))
 
-          null
       }
 
-      sender ! decorateMessage(
-        PartitionState(
-          taskExecutionId,
-          taskResultId,
-          partitionId.getPartitionId,
-          state)
-      )
-
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 14f72b0..97006d2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -127,21 +127,24 @@ object JobManagerMessages {
    */
   case class NextInputSplit(splitData: Array[Byte])
 
+
   /**
-   * Requests the current state of the partition.
-   *
-   * The state of a partition is currently bound to the state of the producing execution.
-   *
-   * @param jobId The job ID of the job, which produces the partition.
-   * @param partitionId The partition ID of the partition to request the state of.
-   * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
-   * @param taskResultId The input gate ID of the task requesting the partition state.
-   */
-  case class RequestPartitionState(
+    * Requests the execution state of the execution producing a result partition.
+    *
+    * @param jobId                 ID of the job the partition belongs to.
+    * @param receiverExecutionId   Execution ID of the task who triggers the request. Required to
+    *                              return an answer to the TaskManager, which needs the ID to route
+    *                              this to the receiver task.
+    * @param intermediateDataSetId ID of the parent intermediate data set.
+    * @param resultPartitionId     ID of the result partition to check. This
+    *                              identifies the producing execution and
+    *                              partition.
+    */
+  case class RequestPartitionProducerState(
       jobId: JobID,
-      partitionId: ResultPartitionID,
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID)
+      receiverExecutionId: ExecutionAttemptID,
+      intermediateDataSetId: IntermediateDataSetID,
+      resultPartitionId: ResultPartitionID)
     extends RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index 94762ee..e73d651 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.messages
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 /**
@@ -92,13 +93,19 @@ object TaskMessages {
   // --------------------------------------------------------------------------
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the respective partition.
-   */
-  case class PartitionState(
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID,
-      partitionId: IntermediateResultPartitionID,
-      state: ExecutionState)
+    * Answer to a [[RequestPartitionProducerState]] with the state of the partition producer.
+    *
+    * @param receiverExecutionId The execution attempt ID of the task who triggered the
+    *                            original request and should receive this update.
+    * @param result              Either a successful or failed partition producer state check
+    *                            result. On success, this is a 3-tuple of intermediate data set ID
+    *                            (to identify the input gate), the partition ID (to identify the
+    *                            channel) and the producer state. On failure, this contains the
+    *                            failure cause.
+    */
+  case class PartitionProducerState(
+      receiverExecutionId: ExecutionAttemptID,
+      result: Either[(IntermediateDataSetID, ResultPartitionID, ExecutionState), Exception])
     extends TaskMessage with RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 40ae234..a751865 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -24,7 +24,7 @@ import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{TimeUnit, TimeoutException}
 import javax.management.ObjectName
 
 import _root_.akka.actor._
@@ -42,11 +42,11 @@ import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
 import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.clusterframework.messages.StopCluster
-import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor}
 import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager}
@@ -58,16 +58,17 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -80,7 +81,7 @@ import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a Flink job. It is
@@ -503,15 +504,38 @@ class TaskManager(
             )
           }
 
-        case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
-          Option(runningTasks.get(taskExecutionId)) match {
+        // Updates the partition producer state
+        case PartitionProducerState(receiverExecutionId, result) =>
+          Option(runningTasks.get(receiverExecutionId)) match {
             case Some(task) =>
-              task.onPartitionStateUpdate(taskResultId, partitionId, state)
+              try {
+                result match {
+                  case Left((intermediateDataSetId, resultPartitionId, producerState)) =>
+                    // Forward the state update to the task
+                    task.onPartitionStateUpdate(
+                      intermediateDataSetId,
+                      resultPartitionId.getPartitionId,
+                      producerState)
+
+                  case Right(failure) =>
+                    // Cancel or fail the execution
+                    if (failure.isInstanceOf[PartitionProducerDisposedException]) {
+                      log.info("Partition producer disposed. Cancelling " +
+                        s"execution $receiverExecutionId.", failure)
+                      task.cancelExecution()
+                    } else {
+                      task.failExternally(failure)
+                    }
+                }
+              } catch {
+                case e: Exception => task.failExternally(e)
+              }
             case None =>
-              log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
+              log.debug(s"Cannot find task with ID $receiverExecutionId to forward partition " +
+                "state respond to.")
           }
       }
-      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index a5f4c7d..955f335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -65,7 +65,7 @@ public class InputGateConcurrentTest {
 				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
@@ -102,7 +102,7 @@ public class InputGateConcurrentTest {
 				new IntermediateDataSetID(),
 				0,
 				numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
@@ -152,7 +152,7 @@ public class InputGateConcurrentTest {
 				new IntermediateDataSetID(),
 				0,
 				numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		for (int i = 0, local = 0; i < numChannels; i++) {
@@ -192,7 +192,7 @@ public class InputGateConcurrentTest {
 	// ------------------------------------------------------------------------
 
 	private static abstract class Source {
-	
+
 		abstract void addBuffer(Buffer buffer) throws Exception;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 192b0eb..c367018 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -92,7 +92,7 @@ public class InputGateFairnessTest {
 				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
@@ -146,7 +146,7 @@ public class InputGateFairnessTest {
 				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
@@ -197,7 +197,7 @@ public class InputGateFairnessTest {
 				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final ConnectionManager connManager = createDummyConnectionManager();
@@ -206,11 +206,11 @@ public class InputGateFairnessTest {
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
-					gate, i, new ResultPartitionID(), mock(ConnectionID.class), 
+					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
 					connManager, new Tuple2<>(0, 0), new DummyIOMetricGroup());
 
 			channels[i] = channel;
-			
+
 			for (int p = 0; p < buffersPerChannel; p++) {
 				channel.onBuffer(mockBuffer, p);
 			}
@@ -253,7 +253,7 @@ public class InputGateFairnessTest {
 				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
-				mock(PartitionStateChecker.class),
+				mock(PartitionProducerStateChecker.class),
 				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final ConnectionManager connManager = createDummyConnectionManager();
@@ -335,7 +335,7 @@ public class InputGateFairnessTest {
 			partitions[i].onBuffer(buffer, sequenceNumbers[i]++);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
@@ -352,11 +352,11 @@ public class InputGateFairnessTest {
 				IntermediateDataSetID consumedResultId,
 				int consumedSubpartitionIndex,
 				int numberOfInputChannels,
-				PartitionStateChecker partitionStateChecker,
+				PartitionProducerStateChecker partitionProducerStateChecker,
 				IOMetricGroup metrics) {
 
 			super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
-					numberOfInputChannels, partitionStateChecker, metrics);
+					numberOfInputChannels, partitionProducerStateChecker, metrics);
 
 			try {
 				Field f = SingleInputGate.class.getDeclaredField("inputChannelsWithData");

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5d0a106..411f344 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -284,7 +284,7 @@ public class LocalInputChannelTest {
 			new IntermediateDataSetID(),
 			0,
 			1,
-			mock(PartitionStateChecker.class),
+			mock(PartitionProducerStateChecker.class),
 			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
 		);
 
@@ -481,7 +481,7 @@ public class LocalInputChannelTest {
 					new IntermediateDataSetID(),
 					subpartitionIndex,
 					numberOfInputChannels,
-					mock(PartitionStateChecker.class),
+					mock(PartitionProducerStateChecker.class),
 					new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 			// Set buffer pool

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index ec4b31d..f2fb2d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -74,7 +74,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-			"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 			new TestInputChannel(inputGate, 0),
@@ -128,7 +128,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -183,7 +183,7 @@ public class SingleInputGateTest {
 			new IntermediateDataSetID(),
 			0,
 			1,
-			mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -223,7 +223,7 @@ public class SingleInputGateTest {
 			new IntermediateDataSetID(),
 			0,
 			1,
-			mock(PartitionStateChecker.class),
+			mock(PartitionProducerStateChecker.class),
 			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
@@ -314,7 +314,7 @@ public class SingleInputGateTest {
 		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
 		when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager());
 		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
-		when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+		when(netEnv.getPartitionProducerStateChecker()).thenReturn(mock(PartitionProducerStateChecker.class));
 		when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff));
 		when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 0749467..b1d9483 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -59,7 +59,7 @@ public class TestSingleInputGate {
 			new IntermediateDataSetID(),
 			0,
 			numberOfInputChannels,
-			mock(PartitionStateChecker.class),
+			mock(PartitionProducerStateChecker.class),
 			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		this.inputGate = spy(realGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index faec77e..8cfda0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
@@ -43,8 +43,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionProducerStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b56bf29..e41982e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -48,12 +48,12 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.PartitionProducerState;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -89,7 +89,6 @@ import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -178,7 +177,7 @@ public class JobManagerTest extends TestLogger {
 						// Request the execution graph to get the runtime info
 						jobManagerGateway.tell(new RequestExecutionGraph(jid), testActorGateway);
 
-						final ExecutionGraph eg = expectMsgClass(ExecutionGraphFound.class)
+						final ExecutionGraph eg = (ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
 							.executionGraph();
 
 						final ExecutionVertex vertex = eg.getJobVertex(sender.getID())
@@ -193,59 +192,236 @@ public class JobManagerTest extends TestLogger {
 
 						// - The test ----------------------------------------------------------------------
 
+						ExecutionAttemptID receiverId = new ExecutionAttemptID();
+
 						// 1. All execution states
-						RequestPartitionState request = new RequestPartitionState(
-							jid, partitionId, receiver, rid);
+						RequestPartitionProducerState request = new RequestPartitionProducerState(
+							jid, receiverId, rid, partitionId);
 
 						for (ExecutionState state : ExecutionState.values()) {
 							ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-							jobManagerGateway.tell(request, testActorGateway);
+							Future<?> futurePartitionState = jobManagerGateway
+								.ask(request, getRemainingTime());
 
-							LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+							LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+							PartitionProducerState resp = (PartitionProducerState) (PartitionProducerState) wrappedMsg.message();
+							assertEquals(receiverId, resp.receiverExecutionId());
+							assertTrue("Responded with failure: " + resp, resp.result().isLeft());
+							assertEquals(state, resp.result().left().get()._3());
+						}
 
-							assertEquals(PartitionState.class, lsm.message().getClass());
+						// 2. Non-existing execution
+						request = new RequestPartitionProducerState(jid, receiverId, rid, new ResultPartitionID());
 
-							PartitionState resp = (PartitionState) lsm.message();
+						Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+						LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+						PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+						assertEquals(receiverId, resp.receiverExecutionId());
+						assertTrue("Responded with success: " + resp, resp.result().isRight());
+						assertTrue(resp.result().right().get() instanceof RuntimeException);
+						assertTrue(resp.result().right().get().getCause() instanceof IllegalArgumentException);
 
-							assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-							assertEquals(request.taskResultId(), resp.taskResultId());
-							assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-							assertEquals(state, resp.state());
+						// 3. Non-existing job
+						request = new RequestPartitionProducerState(new JobID(), receiverId, rid, new ResultPartitionID());
+						futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+						wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+						resp = (PartitionProducerState) wrappedMsg.message();
+						assertEquals(receiverId, resp.receiverExecutionId());
+						assertTrue("Responded with success: " + resp, resp.result().isRight());
+						assertTrue(resp.result().right().get() instanceof IllegalArgumentException);
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
 						}
+					}
+				}
+			};
+		}};
+	}
 
-						// 2. Non-existing execution
-						request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+	/**
+	 * Tests the JobManager response when the execution is not registered with
+	 * the ExecutionGraph.
+	 */
+	@Test
+	public void testRequestPartitionStateUnregisteredExecution() throws Exception {
+		new JavaTestKit(system) {{
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
+
+					try {
+						cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+						final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+						sender.createAndAddResultDataSet(rid, PIPELINED);
 
-						jobManagerGateway.tell(request, testActorGateway);
+						final JobVertex sender2 = new JobVertex("Blocking Sender");
+						sender2.setParallelism(1);
+						sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-						LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+						final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+						final JobID jid = jobGraph.getJobID();
 
-						assertEquals(PartitionState.class, lsm.message().getClass());
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
 
-						PartitionState resp = (PartitionState) lsm.message();
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
-						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-						assertEquals(request.taskResultId(), resp.taskResultId());
-						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-						assertNull(resp.state());
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
-						// 3. Non-existing job
-						request = new RequestPartitionState(
-							new JobID(), new ResultPartitionID(), receiver, rid);
+						jobManagerGateway.tell(
+							new WaitForAllVerticesToBeRunningOrFinished(jid),
+							testActorGateway);
+
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+						Future<Object> egFuture = jobManagerGateway.ask(
+							new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+						ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+						ExecutionGraph eg = egFound.executionGraph();
+
+						ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+						while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+							Thread.sleep(1);
+						}
+
+						IntermediateResultPartition partition = vertex.getProducedPartitions()
+							.values().iterator().next();
+
+						ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(),
+							vertex.getCurrentExecutionAttempt().getAttemptId());
+
+						// Producer finished, request state
+						ExecutionAttemptID receiverId = new ExecutionAttemptID();
+
+						Future<?> producerStateFuture = jobManagerGateway.ask(
+							new RequestPartitionProducerState(jid, receiverId, rid, partitionId), getRemainingTime());
+
+						LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+						PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+						assertEquals(receiverId, resp.receiverExecutionId());
+						assertTrue("Responded with failure: " + resp, resp.result().isLeft());
+						assertEquals(ExecutionState.FINISHED, resp.result().left().get()._3());
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
+				}
+			};
+		}};
+	}
+
+	/**
+	 * Tests the JobManager response when the execution is not registered with
+	 * the ExecutionGraph anymore and a new execution attempt is available.
+	 */
+	@Test
+	public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
+		new JavaTestKit(system) {{
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
+
+					try {
+						cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+						final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+						sender.createAndAddResultDataSet(rid, PIPELINED);
+
+						final JobVertex sender2 = new JobVertex("Blocking Sender");
+						sender2.setParallelism(1);
+						sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-						jobManagerGateway.tell(request, testActorGateway);
+						final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+						final JobID jid = jobGraph.getJobID();
+
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
+
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+						jobManagerGateway.tell(
+							new WaitForAllVerticesToBeRunningOrFinished(jid),
+							testActorGateway);
+
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+						Future<Object> egFuture = jobManagerGateway.ask(
+							new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+						ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+						ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+						ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+						while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+							Thread.sleep(1);
+						}
+
+						IntermediateResultPartition partition = vertex.getProducedPartitions()
+							.values().iterator().next();
+
+						ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(),
+							vertex.getCurrentExecutionAttempt().getAttemptId());
 
-						lsm = expectMsgClass(LeaderSessionMessage.class);
+						// Reset execution => new execution attempt
+						vertex.resetForNewExecution();
 
-						assertEquals(PartitionState.class, lsm.message().getClass());
+						// Producer finished, request state
+						ExecutionAttemptID receiverId = new ExecutionAttemptID();
 
-						resp = (PartitionState) lsm.message();
+						Object request = new RequestPartitionProducerState(jid, receiverId, rid, partitionId);
 
-						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-						assertEquals(request.taskResultId(), resp.taskResultId());
-						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-						assertNull(resp.state());
+						Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime());
+
+						LeaderSessionMessage wrappedMsg = (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+						PartitionProducerState resp = (PartitionProducerState) wrappedMsg.message();
+						assertEquals(receiverId, resp.receiverExecutionId());
+						assertTrue("Responded with success: " + resp, resp.result().isRight());
+						assertTrue(resp.result().right().get() instanceof PartitionProducerDisposedException);
 					} catch (Exception e) {
 						e.printStackTrace();
 						fail(e.getMessage());
@@ -366,11 +542,7 @@ public class JobManagerTest extends TestLogger {
 	}
 
 	/**
-					system.dispatcher(),
-				actorSystem.dispatcher(),
-	 * Tests that we can trigger a
-	 *
-	 * @throws Exception
+	 * Tests that we can trigger a savepoint when periodic checkpointing is disabled.
 	 */
 	@Test
 	public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 6a696a0..87fb24c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
@@ -63,7 +64,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
@@ -81,10 +81,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.Tuple3;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
+import scala.util.Left;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -100,7 +102,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1586,14 +1587,15 @@ public class TaskManagerTest extends TestLogger {
 
 		@Override
 		public void handleMessage(Object message) throws Exception {
-			if (message instanceof RequestPartitionState) {
-				final RequestPartitionState msg = (RequestPartitionState) message;
-
-				PartitionState resp = new PartitionState(
-						msg.taskExecutionId(),
-						msg.taskResultId(),
-						msg.partitionId().getPartitionId(),
-						ExecutionState.RUNNING);
+			if (message instanceof JobManagerMessages.RequestPartitionProducerState) {
+				JobManagerMessages.RequestPartitionProducerState msg = (JobManagerMessages.RequestPartitionProducerState) message;
+
+				TaskMessages.PartitionProducerState resp = new TaskMessages.PartitionProducerState(
+					msg.receiverExecutionId(),
+					new Left<Tuple3<IntermediateDataSetID, ResultPartitionID, ExecutionState>, Exception>(new Tuple3<>(
+						msg.intermediateDataSetId(),
+						msg.resultPartitionId(),
+						ExecutionState.RUNNING)));
 
 				getSender().tell(decorateMessage(resp), getSelf());
 			}