You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/12 16:42:58 UTC

[2/2] flink git commit: [FLINK-5114] [network] Handle partition producer state check for unregistered executions

[FLINK-5114] [network] Handle partition producer state check for unregistered executions

This closes #2912.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a078666d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a078666d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a078666d

Branch: refs/heads/master
Commit: a078666d42ab4dae01dedaa50d55343ce141fcb8
Parents: 47db9cb
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Nov 22 16:15:04 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Dec 12 17:39:16 2016 +0100

----------------------------------------------------------------------
 .../executiongraph/IntermediateResult.java      |  37 ++-
 .../runtime/io/network/PartitionState.java      |  64 -----
 .../netty/PartitionProducerStateChecker.java    |  52 +++++
 .../io/network/netty/PartitionStateChecker.java |  35 ---
 .../partition/consumer/SingleInputGate.java     |  13 +-
 .../PartitionProducerDisposedException.java     |  36 +++
 ...torGatewayPartitionProducerStateChecker.java |  66 ++++++
 .../ActorGatewayPartitionStateChecker.java      |  67 ------
 .../apache/flink/runtime/taskmanager/Task.java  | 121 ++++++----
 .../flink/runtime/taskmanager/TaskActions.java  |  20 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  70 ++++--
 .../runtime/messages/JobManagerMessages.scala   |  24 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../partition/InputGateConcurrentTest.java      |   4 -
 .../partition/InputGateFairnessTest.java        |   8 +-
 .../consumer/LocalInputChannelTest.java         |   3 -
 .../partition/consumer/SingleInputGateTest.java |   6 +-
 .../partition/consumer/TestSingleInputGate.java |   2 -
 .../partition/consumer/UnionInputGateTest.java  |   5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 234 ++++++++++++++++---
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   6 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  14 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   4 +-
 .../flink/runtime/taskmanager/TaskTest.java     | 148 +++++++++++-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   7 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   6 +-
 27 files changed, 698 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index c2c19d1..313272c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
 
 	private final IntermediateResultPartition[] partitions;
 
+	/**
+	 * Maps intermediate result partition IDs to a partition index. This is
+	 * used for ID lookups of intermediate results. I didn't dare to change the
+	 * partition connect logic in other places that is tightly coupled to the
+	 * partitions being held as an array.
+	 */
+	private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
+
 	private final int numParallelProducers;
 
 	private final AtomicInteger numberOfRunningProducers;
@@ -54,10 +64,12 @@ public class IntermediateResult {
 
 		this.id = checkNotNull(id);
 		this.producer = checkNotNull(producer);
-		this.partitions = new IntermediateResultPartition[numParallelProducers];
+
 		checkArgument(numParallelProducers >= 1);
 		this.numParallelProducers = numParallelProducers;
 
+		this.partitions = new IntermediateResultPartition[numParallelProducers];
+
 		this.numberOfRunningProducers = new AtomicInteger(numParallelProducers);
 
 		// we do not set the intermediate result partitions here, because we let them be initialized by
@@ -80,6 +92,7 @@ public class IntermediateResult {
 		}
 
 		partitions[partitionNumber] = partition;
+		partitionLookupHelper.put(partition.getPartitionId(), partitionNumber);
 		partitionsAssigned++;
 	}
 
@@ -95,6 +108,28 @@ public class IntermediateResult {
 		return partitions;
 	}
 
+	/**
+	 * Returns the partition with the given ID.
+	 *
+	 * @param resultPartitionId ID of the partition to look up
+	 * @throws NullPointerException If partition ID <code>null</code>
+	 * @throws IllegalArgumentException Thrown if unknown partition ID
+	 * @return Intermediate result partition with the given ID
+	 */
+	public IntermediateResultPartition getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+		// Looks ups the partition number via the helper map and returns the
+		// partition. Currently, this happens infrequently enough that we could
+		// consider removing the map and scanning the partitions on every lookup.
+		// The lookup (currently) only happen when the producer of an intermediate
+		// result cannot be found via its registered execution.
+		Integer partitionNumber = partitionLookupHelper.get(checkNotNull(resultPartitionId, "IntermediateResultPartitionID"));
+		if (partitionNumber != null) {
+			return partitions[partitionNumber];
+		} else {
+			throw new IllegalArgumentException("Unknown intermediate result partition ID " + resultPartitionId);
+		}
+	}
+
 	public int getNumberOfAssignedPartitions() {
 		return partitionsAssigned;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
deleted file mode 100644
index 59357fc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
-/**
- * Contains information about the state of a result partition.
- */
-public class PartitionState implements Serializable {
-
-	private static final long serialVersionUID = -4693651272083825031L;
-
-	private final IntermediateDataSetID intermediateDataSetID;
-	private final IntermediateResultPartitionID intermediateResultPartitionID;
-	private final ExecutionState executionState;
-
-	public PartitionState(
-		IntermediateDataSetID intermediateDataSetID,
-		IntermediateResultPartitionID intermediateResultPartitionID,
-		@Nullable ExecutionState executionState) {
-
-		this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID);
-		this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID);
-		this.executionState = executionState;
-	}
-
-	public IntermediateDataSetID getIntermediateDataSetID() {
-		return intermediateDataSetID;
-	}
-
-	public IntermediateResultPartitionID getIntermediateResultPartitionID() {
-		return intermediateResultPartitionID;
-	}
-
-	/**
-	 * Returns the execution state of the partition producer or <code>null</code> if it is not available.
-	 */
-	public ExecutionState getExecutionState() {
-		return executionState;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..d0b7e1e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+	/**
+	 * Requests the execution state of the execution producing a result partition.
+	 *
+	 * @param jobId ID of the job the partition belongs to.
+	 * @param intermediateDataSetId ID of the parent intermediate data set.
+	 * @param resultPartitionId ID of the result partition to check. This
+	 * identifies the producing execution and partition.
+	 *
+	 * @return Future holding the execution state of the producing execution.
+	 */
+	Future<ExecutionState> requestPartitionProducerState(
+			JobID jobId,
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index 949f426..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
-	Future<PartitionState> requestPartitionState(
-			JobID jobId,
-			ExecutionAttemptID executionId,
-			IntermediateDataSetID resultId,
-			ResultPartitionID partitionId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b4d8d2c..d546559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -108,9 +108,6 @@ public class SingleInputGate implements InputGate {
 	/** The job ID of the owning task. */
 	private final JobID jobId;
 
-	/** The execution attempt ID of the owning task. */
-	private final ExecutionAttemptID executionId;
-
 	/**
 	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 	 * intermediate result specified by this ID. This ID also identifies the input gate at the
@@ -168,7 +165,6 @@ public class SingleInputGate implements InputGate {
 	public SingleInputGate(
 		String owningTaskName,
 		JobID jobId,
-		ExecutionAttemptID executionId,
 		IntermediateDataSetID consumedResultId,
 		int consumedSubpartitionIndex,
 		int numberOfInputChannels,
@@ -177,7 +173,6 @@ public class SingleInputGate implements InputGate {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
-		this.executionId = checkNotNull(executionId);
 
 		this.consumedResultId = checkNotNull(consumedResultId);
 
@@ -530,11 +525,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-		taskActions.triggerPartitionStateCheck(
-			jobId,
-			executionId,
-			consumedResultId,
-			partitionId);
+		taskActions.triggerPartitionProducerStateCheck(jobId, consumedResultId, partitionId);
 	}
 
 	private void queueChannel(InputChannel channel) {
@@ -587,7 +578,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-			owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+			owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
 			icdd.length, taskActions, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+	public PartitionProducerDisposedException(ResultPartitionID resultPartitionID) {
+		super(String.format("Execution %s producing partition %s has already been disposed.",
+			resultPartitionID.getProducerId(),
+			resultPartitionID.getPartitionId()));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
new file mode 100644
index 0000000..5c229a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
+ * manager.
+ */
+public class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker {
+
+	private final ActorGateway jobManager;
+	private final FiniteDuration timeout;
+
+	public ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
+		this.jobManager = Preconditions.checkNotNull(jobManager);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+	@Override
+	public Future<ExecutionState> requestPartitionProducerState(
+			JobID jobId,
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId) {
+
+		JobManagerMessages.RequestPartitionProducerState msg = new JobManagerMessages.RequestPartitionProducerState(
+			jobId,
+			intermediateDataSetId, resultPartitionId
+		);
+
+		scala.concurrent.Future<ExecutionState> futureResponse = jobManager
+			.ask(msg, timeout)
+			.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+		return new FlinkFuture<>(futureResponse);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
deleted file mode 100644
index efa6ec3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
- * manager.
- */
-public class ActorGatewayPartitionStateChecker implements PartitionStateChecker {
-
-	private final ActorGateway jobManager;
-	private final FiniteDuration timeout;
-
-	public ActorGatewayPartitionStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
-		this.jobManager = Preconditions.checkNotNull(jobManager);
-		this.timeout = Preconditions.checkNotNull(timeout);
-	}
-
-	@Override
-	public Future<PartitionState> requestPartitionState(
-			JobID jobId,
-			ExecutionAttemptID executionAttemptId,
-			IntermediateDataSetID resultId,
-			ResultPartitionID partitionId) {
-		JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(
-			jobId,
-			partitionId,
-			executionAttemptId,
-			resultId);
-
-		scala.concurrent.Future<PartitionState> futureResponse = jobManager
-			.ask(msg, timeout)
-			.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
-
-		return new FlinkFuture<>(futureResponse);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 184c3b1..a1fb35e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,20 +46,19 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -206,8 +205,8 @@ public class Task implements Runnable, TaskActions {
 	/** Parent group for all metrics of this task */
 	private final TaskMetricGroup metrics;
 
-	/** Partition state checker to request partition states from */
-	private final PartitionStateChecker partitionStateChecker;
+	/** Partition producer state checker to request partition states from */
+	private final PartitionProducerStateChecker partitionProducerStateChecker;
 
 	/** Executor to run future callbacks */
 	private final Executor executor;
@@ -271,7 +270,7 @@ public class Task implements Runnable, TaskActions {
 		TaskManagerRuntimeInfo taskManagerConfig,
 		TaskMetricGroup metricGroup,
 		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-		PartitionStateChecker partitionStateChecker,
+		PartitionProducerStateChecker partitionProducerStateChecker,
 		Executor executor) {
 
 		Preconditions.checkNotNull(jobInformation);
@@ -321,7 +320,7 @@ public class Task implements Runnable, TaskActions {
 		this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
 		this.metrics = metricGroup;
 
-		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
+		this.partitionProducerStateChecker = Preconditions.checkNotNull(partitionProducerStateChecker);
 		this.executor = Preconditions.checkNotNull(executor);
 
 		// create the reader and writer structures
@@ -1036,32 +1035,37 @@ public class Task implements Runnable, TaskActions {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void triggerPartitionStateCheck(
+	public void triggerPartitionProducerStateCheck(
 		JobID jobId,
-		ExecutionAttemptID executionId,
-		final IntermediateDataSetID resultId,
-		final ResultPartitionID partitionId) {
-		org.apache.flink.runtime.concurrent.Future<PartitionState> futurePartitionState = partitionStateChecker.requestPartitionState(
-			jobId,
-			executionId,
-			resultId,
-			partitionId);
-
-		futurePartitionState.handleAsync(new BiFunction<PartitionState, Throwable, Void>() {
+		final IntermediateDataSetID intermediateDataSetId,
+		final ResultPartitionID resultPartitionId) {
+
+		org.apache.flink.runtime.concurrent.Future<ExecutionState> futurePartitionState =
+			partitionProducerStateChecker.requestPartitionProducerState(
+				jobId,
+				intermediateDataSetId,
+				resultPartitionId);
+
+		futurePartitionState.handleAsync(new BiFunction<ExecutionState, Throwable, Void>() {
 			@Override
-			public Void apply(PartitionState partitionState, Throwable throwable) {
+			public Void apply(ExecutionState executionState, Throwable throwable) {
 				try {
-					if (partitionState != null) {
+					if (executionState != null) {
 						onPartitionStateUpdate(
-							partitionState.getIntermediateDataSetID(),
-							partitionState.getIntermediateResultPartitionID(),
-							partitionState.getExecutionState());
+							intermediateDataSetId,
+							resultPartitionId,
+							executionState);
 					} else if (throwable instanceof TimeoutException) {
 						// our request timed out, assume we're still running and try again
 						onPartitionStateUpdate(
-							resultId,
-							partitionId.getPartitionId(),
+							intermediateDataSetId,
+							resultPartitionId,
 							ExecutionState.RUNNING);
+					} else if (throwable instanceof PartitionProducerDisposedException) {
+						String msg = String.format("Producer {} of partition {} disposed. Cancelling execution.",
+							resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
+						LOG.info(msg, throwable);
+						cancelExecution();
 					} else {
 						failExternally(throwable);
 					}
@@ -1183,41 +1187,58 @@ public class Task implements Runnable, TaskActions {
 	/**
 	 * Answer to a partition state check issued after a failed partition request.
 	 */
-	public void onPartitionStateUpdate(
-			IntermediateDataSetID resultId,
-			IntermediateResultPartitionID partitionId,
-			ExecutionState partitionState) throws IOException, InterruptedException {
+	@VisibleForTesting
+	void onPartitionStateUpdate(
+			IntermediateDataSetID intermediateDataSetId,
+			ResultPartitionID resultPartitionId,
+			ExecutionState producerState) throws IOException, InterruptedException {
 
 		if (executionState == ExecutionState.RUNNING) {
-			final SingleInputGate inputGate = inputGatesById.get(resultId);
+			final SingleInputGate inputGate = inputGatesById.get(intermediateDataSetId);
 
 			if (inputGate != null) {
-				if (partitionState == ExecutionState.RUNNING ||
-					partitionState == ExecutionState.FINISHED ||
-					partitionState == ExecutionState.SCHEDULED ||
-					partitionState == ExecutionState.DEPLOYING) {
+				if (producerState == ExecutionState.SCHEDULED
+					|| producerState == ExecutionState.DEPLOYING
+					|| producerState == ExecutionState.RUNNING
+					|| producerState == ExecutionState.FINISHED) {
 
 					// Retrigger the partition request
-					inputGate.retriggerPartitionRequest(partitionId);
-				}
-				else if (partitionState == ExecutionState.CANCELED
-						|| partitionState == ExecutionState.CANCELING
-						|| partitionState == ExecutionState.FAILED) {
+					inputGate.retriggerPartitionRequest(resultPartitionId.getPartitionId());
+
+				} else if (producerState == ExecutionState.CANCELING
+					|| producerState == ExecutionState.CANCELED
+					|| producerState == ExecutionState.FAILED) {
+
+					// The producing execution has been canceled or failed. We
+					// don't need to re-trigger the request since it cannot
+					// succeed.
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.",
+							taskNameWithSubtask,
+							resultPartitionId.getPartitionId(),
+							resultPartitionId.getProducerId(),
+							producerState);
+					}
 
 					cancelExecution();
+				} else {
+					// Any other execution state is unexpected. Currently, only
+					// state CREATED is left out of the checked states. If we
+					// see a producer in this state, something went wrong with
+					// scheduling in topological order.
+					String msg = String.format("Producer with attempt ID %s of partition %s in unexpected state %s.",
+						resultPartitionId.getProducerId(),
+						resultPartitionId.getPartitionId(),
+						producerState);
+
+					failExternally(new IllegalStateException(msg));
 				}
-				else {
-					failExternally(new IllegalStateException("Received unexpected partition state "
-							+ partitionState + " for partition request. This is a bug."));
-				}
-			}
-			else {
-				failExternally(new IllegalStateException("Received partition state for " +
-						"unknown input gate " + resultId + ". This is a bug."));
+			} else {
+				failExternally(new IllegalStateException("Received partition producer state for " +
+						"unknown input gate " + intermediateDataSetId + "."));
 			}
-		}
-		else {
-			LOG.debug("Ignoring partition state notification for not running task.");
+		} else {
+			LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", taskNameWithSubtask);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
index 4f12691..f7650d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
@@ -29,21 +28,20 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 public interface TaskActions {
 
 	/**
-	 * Check the partition state of the given partition.
+	 * Check the execution state of the execution producing a result partition.
 	 *
-	 * @param jobId of the partition
-	 * @param executionId of the partition
-	 * @param resultId of the partition
-	 * @param partitionId of the partition
+	 * @param jobId ID of the job the partition belongs to.
+	 * @param intermediateDataSetId ID of the parent intermediate data set.
+	 * @param resultPartitionId ID of the result partition to check. This
+	 * identifies the producing execution and partition.
 	 */
-	void triggerPartitionStateCheck(
+	void triggerPartitionProducerStateCheck(
 		JobID jobId,
-		ExecutionAttemptID executionId,
-		IntermediateDataSetID resultId,
-		ResultPartitionID partitionId);
+		IntermediateDataSetID intermediateDataSetId,
+		ResultPartitionID resultPartitionId);
 
 	/**
-	 * Fail the owning task with the given throwawble.
+	 * Fail the owning task with the given throwable.
 	 *
 	 * @param cause of the failure
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1dfd3db..8c686cd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -50,8 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
-import org.apache.flink.runtime.io.network.PartitionState
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -78,7 +77,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, JobException, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 import org.jboss.netty.channel.ChannelException
 
@@ -935,27 +934,58 @@ class JobManager(
           )
       }
 
-    case RequestPartitionState(jobId, partitionId, taskExecutionId, taskResultId) =>
-      val state = currentJobs.get(jobId) match {
+    case RequestPartitionProducerState(jobId, intermediateDataSetId, resultPartitionId) =>
+      currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          val execution = executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+          try {
+            // Find the execution attempt producing the intermediate result partition.
+            val execution = executionGraph
+              .getRegisteredExecutions
+              .get(resultPartitionId.getProducerId)
+
+            if (execution != null) {
+              // Common case for pipelined exchanges => producing execution is
+              // still active.
+              sender ! decorateMessage(execution.getState)
+            } else {
+              // The producing execution might have terminated and been
+              // unregistered. We now look for the producing execution via the
+              // intermediate result itself.
+              val intermediateResult = executionGraph
+                .getAllIntermediateResults.get(intermediateDataSetId)
+
+              if (intermediateResult != null) {
+                // Try to find the producing execution
+                val producerExecution = intermediateResult
+                  .getPartitionById(resultPartitionId.getPartitionId)
+                  .getProducer
+                  .getCurrentExecutionAttempt
+
+                if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+                  sender ! decorateMessage(producerExecution.getState)
+                } else {
+                  val cause = new PartitionProducerDisposedException(resultPartitionId)
+                  sender ! decorateMessage(Status.Failure(cause))
+                }
+              } else {
+                val cause = new IllegalArgumentException(
+                  s"Intermediate data set with ID $intermediateDataSetId not found.")
+                sender ! decorateMessage(Status.Failure(cause))
+              }
+            }
+          } catch {
+            case e: Exception =>
+              sender ! decorateMessage(
+                Status.Failure(new RuntimeException("Failed to look up execution state of " +
+                  s"producer with ID ${resultPartitionId.getProducerId}.", e)))
+          }
 
-          if (execution != null) execution.getState else null
         case None =>
-          // Nothing to do. This is not an error, because the request is received when a sending
-          // task fails or is not yet available during a remote partition request.
-          log.debug(s"Cannot find execution graph for job $jobId.")
+          sender ! decorateMessage(
+            Status.Failure(new IllegalArgumentException(s"Job with ID $jobId not found.")))
 
-          null
       }
 
-      sender ! decorateMessage(
-        new PartitionState(
-          taskResultId,
-          partitionId.getPartitionId,
-          state)
-      )
-
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) =>
@@ -1059,7 +1089,7 @@ class JobManager(
       taskManagerMap.get(taskManagerActorRef) match {
         case Some(instanceId) => handleTaskManagerTerminated(taskManagerActorRef, instanceId)
         case None =>  log.debug("Received terminated message for task manager " +
-                                  s"${taskManagerActorRef} which is not " +
+                                  s"$taskManagerActorRef which is not " +
                                   "connected to this job manager.")
       }
 
@@ -2092,7 +2122,7 @@ object JobManager {
     def sleepBeforeRetry() : Unit = {
       if (maxSleepBetweenRetries > 0) {
         val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
-        LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} ms.")
+        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
         Thread.sleep(sleepTime)
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3d72f1a..65819f4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -157,20 +157,18 @@ object JobManagerMessages {
   case class NextInputSplit(splitData: Array[Byte])
 
   /**
-   * Requests the current state of the partition.
-   *
-   * The state of a partition is currently bound to the state of the producing execution.
-   *
-   * @param jobId The job ID of the job, which produces the partition.
-   * @param partitionId The partition ID of the partition to request the state of.
-   * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
-   * @param taskResultId The input gate ID of the task requesting the partition state.
-   */
-  case class RequestPartitionState(
+    * Requests the execution state of the execution producing a result partition.
+    *
+    * @param jobId                 ID of the job the partition belongs to.
+    * @param intermediateDataSetId ID of the parent intermediate data set.
+    * @param resultPartitionId     ID of the result partition to check. This
+    *                              identifies the producing execution and
+    *                              partition.
+    */
+  case class RequestPartitionProducerState(
       jobId: JobID,
-      partitionId: ResultPartitionID,
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID)
+      intermediateDataSetId: IntermediateDataSetID,
+      resultPartitionId: ResultPartitionID)
     extends RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 271578f..41d3077 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
 import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker}
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionProducerStateChecker}
 import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager}
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -181,7 +181,7 @@ class TaskManager(
 
   private var connectionUtils: Option[(
     CheckpointResponder,
-    PartitionStateChecker,
+    PartitionProducerStateChecker,
     ResultPartitionConsumableNotifier,
     TaskManagerConnection)] = None
 
@@ -916,7 +916,7 @@ class TaskManager(
 
     val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway)
 
-    val partitionStateChecker = new ActorGatewayPartitionStateChecker(
+    val partitionStateChecker = new ActorGatewayPartitionProducerStateChecker(
       jobManagerGateway,
       config.timeout)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 6570679..8cae04c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -60,7 +59,6 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
@@ -96,7 +94,6 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0,
 				numChannels,
@@ -146,7 +143,6 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0,
 				numChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index b35612a..7e1d792 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -87,7 +86,6 @@ public class InputGateFairnessTest {
 		SingleInputGate gate = new FairnessVerifyingInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
@@ -141,7 +139,6 @@ public class InputGateFairnessTest {
 		SingleInputGate gate = new FairnessVerifyingInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
@@ -192,7 +189,6 @@ public class InputGateFairnessTest {
 		SingleInputGate gate = new FairnessVerifyingInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
@@ -248,7 +244,6 @@ public class InputGateFairnessTest {
 		SingleInputGate gate = new FairnessVerifyingInputGate(
 				"Test Task Name",
 				new JobID(),
-				new ExecutionAttemptID(),
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
@@ -346,14 +341,13 @@ public class InputGateFairnessTest {
 		public FairnessVerifyingInputGate(
 				String owningTaskName,
 				JobID jobId,
-				ExecutionAttemptID executionId,
 				IntermediateDataSetID consumedResultId,
 				int consumedSubpartitionIndex,
 				int numberOfInputChannels,
 				TaskActions taskActions,
 				TaskIOMetricGroup metrics) {
 
-			super(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
+			super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
 					numberOfInputChannels, taskActions, metrics);
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 35ed4c3..37ec751 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -283,7 +282,6 @@ public class LocalInputChannelTest {
 		final SingleInputGate gate = new SingleInputGate(
 			"test task name",
 			new JobID(),
-			new ExecutionAttemptID(),
 			new IntermediateDataSetID(),
 			0,
 			1,
@@ -481,7 +479,6 @@ public class LocalInputChannelTest {
 			this.inputGate = new SingleInputGate(
 					"Test Name",
 					new JobID(),
-					new ExecutionAttemptID(),
 					new IntermediateDataSetID(),
 					subpartitionIndex,
 					numberOfInputChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7cae362..a25b8d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -73,7 +73,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-			"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			"Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 			new TestInputChannel(inputGate, 0),
@@ -127,7 +127,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -178,7 +178,6 @@ public class SingleInputGateTest {
 		SingleInputGate inputGate = new SingleInputGate(
 			"t1",
 			new JobID(),
-			new ExecutionAttemptID(),
 			new IntermediateDataSetID(),
 			0,
 			1,
@@ -218,7 +217,6 @@ public class SingleInputGateTest {
 		final SingleInputGate inputGate = new SingleInputGate(
 			"InputGate",
 			new JobID(),
-			new ExecutionAttemptID(),
 			new IntermediateDataSetID(),
 			0,
 			1,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 126a96e..fe3b087 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,7 +54,6 @@ public class TestSingleInputGate {
 		SingleInputGate realGate = new SingleInputGate(
 			"Test Task Name",
 			new JobID(),
-			new ExecutionAttemptID(),
 			new IntermediateDataSetID(),
 			0,
 			numberOfInputChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 84ec202..c05df0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -43,8 +42,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f941c24..6d8c70b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -56,7 +55,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
@@ -116,7 +115,6 @@ import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingClu
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -221,49 +219,227 @@ public class JobManagerTest {
 						// - The test ----------------------------------------------------------------------
 
 						// 1. All execution states
-						RequestPartitionState request = new RequestPartitionState(
-							jid, partitionId, receiver, rid);
+						RequestPartitionProducerState request = new RequestPartitionProducerState(
+							jid, rid, partitionId);
 
 						for (ExecutionState state : ExecutionState.values()) {
 							ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-							Future<PartitionState> futurePartitionState = jobManagerGateway
+							Future<ExecutionState> futurePartitionState = jobManagerGateway
 								.ask(request, getRemainingTime())
-								.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+								.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
 
-							PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
-
-							assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-							assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
-							assertEquals(state, resp.getExecutionState());
+							ExecutionState resp = Await.result(futurePartitionState, getRemainingTime());
+							assertEquals(state, resp);
 						}
 
 						// 2. Non-existing execution
-						request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+						request = new RequestPartitionProducerState(jid, rid, new ResultPartitionID());
+
+						Future<?> futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+						try {
+							Await.result(futurePartitionState, getRemainingTime());
+							fail("Did not fail with expected RuntimeException");
+						} catch (RuntimeException e) {
+							assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+						}
 
-						Future<PartitionState> futurePartitionState = jobManagerGateway
-							.ask(request, getRemainingTime())
-							.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+						// 3. Non-existing job
+						request = new RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
+						futurePartitionState = jobManagerGateway.ask(request, getRemainingTime());
+
+						try {
+							Await.result(futurePartitionState, getRemainingTime());
+							fail("Did not fail with expected IllegalArgumentException");
+						} catch (IllegalArgumentException ignored) {
+						}
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
+				}
+			};
+		}};
+	}
 
-						PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
+	/**
+	 * Tests the JobManager response when the execution is not registered with
+	 * the ExecutionGraph.
+	 */
+	@Test
+	public void testRequestPartitionStateUnregisteredExecution() throws Exception {
+		new JavaTestKit(system) {{
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
 
-						assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-						assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
-						assertNull(resp.getExecutionState());
+					try {
+						cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
 
-						// 3. Non-existing job
-						request = new RequestPartitionState(
-							new JobID(), new ResultPartitionID(), receiver, rid);
+						final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+						sender.createAndAddResultDataSet(rid, PIPELINED);
+
+						final JobVertex sender2 = new JobVertex("Blocking Sender");
+						sender2.setParallelism(1);
+						sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
+
+						final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+						final JobID jid = jobGraph.getJobID();
+
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
+
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+							testActorGateway);
+						expectMsgClass(JobSubmitSuccess.class);
+
+						jobManagerGateway.tell(
+							new WaitForAllVerticesToBeRunningOrFinished(jid),
+							testActorGateway);
+
+						expectMsgClass(AllVerticesRunning.class);
+
+						Future<Object> egFuture = jobManagerGateway.ask(
+							new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+						ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+						ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+						ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+						while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+							Thread.sleep(1);
+						}
+
+						IntermediateResultPartition partition = vertex.getProducedPartitions()
+							.values().iterator().next();
+
+						ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(),
+							vertex.getCurrentExecutionAttempt().getAttemptId());
 
-						futurePartitionState = jobManagerGateway
+						// Producer finished, request state
+						Object request = new RequestPartitionProducerState(jid, rid, partitionId);
+
+						Future<ExecutionState> producerStateFuture = jobManagerGateway
 							.ask(request, getRemainingTime())
-							.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+							.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+						assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture, getRemainingTime()));
+					} catch (Exception e) {
+						e.printStackTrace();
+						fail(e.getMessage());
+					} finally {
+						if (cluster != null) {
+							cluster.shutdown();
+						}
+					}
+				}
+			};
+		}};
+	}
+
+	/**
+	 * Tests the JobManager response when the execution is not registered with
+	 * the ExecutionGraph anymore and a new execution attempt is available.
+	 */
+	@Test
+	public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
+		new JavaTestKit(system) {{
+			new Within(duration("15 seconds")) {
+				@Override
+				protected void run() {
+					// Setup
+					TestingCluster cluster = null;
+
+					try {
+						cluster = startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+						final IntermediateDataSetID rid = new IntermediateDataSetID();
+
+						// Create a task
+						final JobVertex sender = new JobVertex("Sender");
+						sender.setParallelism(1);
+						sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+						sender.createAndAddResultDataSet(rid, PIPELINED);
 
-						resp = Await.result(futurePartitionState, getRemainingTime());
+						final JobVertex sender2 = new JobVertex("Blocking Sender");
+						sender2.setParallelism(1);
+						sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+						sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-						assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-						assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
-						assertNull(resp.getExecutionState());
+						final JobGraph jobGraph = new JobGraph("Fast finishing producer test job", sender, sender2);
+						final JobID jid = jobGraph.getJobID();
+
+						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+							TestingUtils.TESTING_DURATION());
+
+						// we can set the leader session ID to None because we don't use this gateway to send messages
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+						// Submit the job and wait for all vertices to be running
+						jobManagerGateway.tell(
+							new SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+							testActorGateway);
+						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+						jobManagerGateway.tell(
+							new WaitForAllVerticesToBeRunningOrFinished(jid),
+							testActorGateway);
+
+						expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+						Future<Object> egFuture = jobManagerGateway.ask(
+							new RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+						ExecutionGraphFound egFound = (ExecutionGraphFound) Await.result(egFuture, remaining());
+						ExecutionGraph eg = (ExecutionGraph) egFound.executionGraph();
+
+						ExecutionVertex vertex = eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+						while (vertex.getExecutionState() != ExecutionState.FINISHED) {
+							Thread.sleep(1);
+						}
+
+						IntermediateResultPartition partition = vertex.getProducedPartitions()
+							.values().iterator().next();
+
+						ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(),
+							vertex.getCurrentExecutionAttempt().getAttemptId());
+
+						// Reset execution => new execution attempt
+						vertex.resetForNewExecution();
+
+						// Producer finished, request state
+						Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
+
+						Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime());
+
+						try {
+							Await.result(producerStateFuture, getRemainingTime());
+							fail("Did not fail with expected Exception");
+						} catch (PartitionProducerDisposedException ignored) {
+						}
 					} catch (Exception e) {
 						e.printStackTrace();
 						fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e37467b..a7ffa1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -145,7 +145,7 @@ public class TaskAsyncCallTest {
 		
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
-		PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
 		Executor executor = mock(Executor.class);
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
@@ -191,7 +191,7 @@ public class TaskAsyncCallTest {
 			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
 			mock(TaskMetricGroup.class),
 			consumableNotifier,
-			partitionStateChecker,
+			partitionProducerStateChecker,
 			executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index fd9ff05..5ccf8a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -102,7 +101,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -1579,15 +1578,8 @@ public class TaskManagerTest extends TestLogger {
 
 		@Override
 		public void handleMessage(Object message) throws Exception {
-			if (message instanceof RequestPartitionState) {
-				final RequestPartitionState msg = (RequestPartitionState) message;
-
-				PartitionState resp = new PartitionState(
-						msg.taskResultId(),
-						msg.partitionId().getPartitionId(),
-						ExecutionState.RUNNING);
-
-				getSender().tell(decorateMessage(resp), getSelf());
+			if (message instanceof RequestPartitionProducerState) {
+				getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf());
 			}
 			else if (message instanceof TaskMessages.UpdateTaskExecutionState) {
 				final TaskExecutionState msg = ((TaskMessages.UpdateTaskExecutionState) message)

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index d80dab3..ae7d0b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -95,7 +95,7 @@ public class TaskStopTest {
 			tmRuntimeInfo,
 			mock(TaskMetricGroup.class),
 			mock(ResultPartitionConsumableNotifier.class),
-			mock(PartitionStateChecker.class),
+			mock(PartitionProducerStateChecker.class),
 			mock(Executor.class));
 		Field f = task.getClass().getDeclaredField("invokable");
 		f.setAccessible(true);