You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/30 07:07:57 UTC

[2/2] flink git commit: [FLINK-4711] Let the Task trigger partition state requests and handle their responses

[FLINK-4711] Let the Task trigger partition state requests and handle their responses

This PR makes changes the partition state check in a way that the Task is now responsible
for triggering the state check instead of the SingleInputGate. Furthermore, the operation
returns a future containing the JobManager's answer. That way we don't have to route the
response through the TaskManager and can add automatic retries in case of a timeout.

The PR removes the JobManagerCommunicationFactory and gets rid of the excessive
PartitionStateChecker and ResultPartitionConsumableNotifier creation. Instead of creating
for each SingleInputGate one PartitionStateChecker we create one for the TaskManager which
is reused across all SingleInputGates. The same applies to the
ResultPartitionConsumableNotifier.

This closes #2569.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/477d1c5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/477d1c5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/477d1c5d

Branch: refs/heads/master
Commit: 477d1c5d4ca6f469b3c87bc1f7962ece805cae1d
Parents: 7758571
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 29 16:19:30 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Sep 30 09:07:29 2016 +0200

----------------------------------------------------------------------
 .../runtime/io/network/PartitionState.java      | 54 +++++++++++
 .../io/network/netty/PartitionStateChecker.java |  5 +-
 .../io/network/partition/ResultPartition.java   | 31 ++++---
 .../ResultPartitionConsumableNotifier.java      |  5 +-
 .../io/network/partition/ResultPartitionID.java |  2 +
 .../partition/consumer/SingleInputGate.java     | 16 ++--
 ...orGatewayJobManagerCommunicationFactory.java | 61 -------------
 .../ActorGatewayPartitionStateChecker.java      | 34 ++++---
 ...atewayResultPartitionConsumableNotifier.java |  8 +-
 .../JobManagerCommunicationFactory.java         | 47 ----------
 .../apache/flink/runtime/taskmanager/Task.java  | 95 +++++++++++++++-----
 .../flink/runtime/taskmanager/TaskActions.java  | 51 +++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  6 +-
 .../runtime/messages/TaskControlMessages.scala  | 14 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 75 +++++++++-------
 .../io/network/NetworkEnvironmentTest.java      | 46 ++++------
 .../consumer/LocalInputChannelTest.java         | 41 +++++----
 .../partition/consumer/SingleInputGateTest.java | 38 ++++----
 .../partition/consumer/TestSingleInputGate.java | 11 ++-
 .../partition/consumer/UnionInputGateTest.java  |  6 +-
 .../runtime/jobmanager/JobManagerTest.java      | 54 +++++------
 .../runtime/taskmanager/TaskAsyncCallTest.java  | 13 +--
 .../runtime/taskmanager/TaskManagerTest.java    |  3 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |  9 +-
 .../flink/runtime/taskmanager/TaskTest.java     | 26 +++---
 .../tasks/InterruptSensitiveRestoreTest.java    | 10 ++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 14 +--
 27 files changed, 420 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
new file mode 100644
index 0000000..083412b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Contains information about the state of a result partition.
+ */
+public class PartitionState {
+	private final IntermediateDataSetID intermediateDataSetID;
+	private final IntermediateResultPartitionID intermediateResultPartitionID;
+	private final ExecutionState executionState;
+
+	public PartitionState(
+			IntermediateDataSetID intermediateDataSetID,
+			IntermediateResultPartitionID intermediateResultPartitionID,
+			ExecutionState executionState) {
+		this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID);
+		this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID);
+		this.executionState = executionState;
+	}
+
+	public IntermediateDataSetID getIntermediateDataSetID() {
+		return intermediateDataSetID;
+	}
+
+	public IntermediateResultPartitionID getIntermediateResultPartitionID() {
+		return intermediateResultPartitionID;
+	}
+
+	public ExecutionState getExecutionState() {
+		return executionState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
index ecbcdaa..949f426 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
 public interface PartitionStateChecker {
-
-	void triggerPartitionStateCheck(
+	Future<PartitionState> requestPartitionState(
 			JobID jobId,
 			ExecutionAttemptID executionId,
 			IntermediateDataSetID resultId,

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 7bcdd31..5bbfab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +80,8 @@ public class ResultPartition implements BufferPoolOwner {
 	
 	private final String owningTaskName;
 
+	private final TaskActions taskActions;
+
 	private final JobID jobId;
 
 	private final ResultPartitionID partitionId;
@@ -129,18 +132,20 @@ public class ResultPartition implements BufferPoolOwner {
 	private long totalNumberOfBytes;
 
 	public ResultPartition(
-			String owningTaskName,
-			JobID jobId,
-			ResultPartitionID partitionId,
-			ResultPartitionType partitionType,
-			boolean doEagerDeployment,
-			int numberOfSubpartitions,
-			ResultPartitionManager partitionManager,
-			ResultPartitionConsumableNotifier partitionConsumableNotifier,
-			IOManager ioManager,
-			IOMode defaultIoMode) {
+		String owningTaskName,
+		TaskActions taskActions, // actions on the owning task
+		JobID jobId,
+		ResultPartitionID partitionId,
+		ResultPartitionType partitionType,
+		boolean doEagerDeployment,
+		int numberOfSubpartitions,
+		ResultPartitionManager partitionManager,
+		ResultPartitionConsumableNotifier partitionConsumableNotifier,
+		IOManager ioManager,
+		IOMode defaultIoMode) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
+		this.taskActions = checkNotNull(taskActions);
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
@@ -351,7 +356,7 @@ public class ResultPartition implements BufferPoolOwner {
 	 */
 	public void deployConsumers() {
 		if (doEagerDeployment) {
-			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
+			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions);
 		}
 	}
 
@@ -436,9 +441,9 @@ public class ResultPartition implements BufferPoolOwner {
 	/**
 	 * Notifies pipelined consumers of this result partition once.
 	 */
-	private void notifyPipelinedConsumers() throws IOException {
+	private void notifyPipelinedConsumers() {
 		if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
-			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
+			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions);
 
 			hasNotifiedPipelinedConsumers = true;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 6e84b4c..02212ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.io.network.partition;
 
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 
 public interface ResultPartitionConsumableNotifier {
-
-	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId);
-
+	void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, TaskActions taskActions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
index a18abde..b84c33b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionID.java
@@ -33,6 +33,8 @@ import java.io.Serializable;
  */
 public final class ResultPartitionID implements Serializable {
 
+	private static final long serialVersionUID = -902516386203787826L;
+
 	private final IntermediateResultPartitionID partitionId;
 
 	private final ExecutionAttemptID producerId;

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index aaf8887..0db30ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -140,8 +140,8 @@ public class SingleInputGate implements InputGate {
 
 	private final BitSet channelsWithEndOfPartitionEvents;
 
-	/** The partition state checker to use for failed partition requests. */
-	private final PartitionStateChecker partitionStateChecker;
+	/** The partition state listener listening to failed partition requests. */
+	private final TaskActions taskActions;
 
 	/**
 	 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
@@ -174,7 +174,7 @@ public class SingleInputGate implements InputGate {
 			IntermediateDataSetID consumedResultId,
 			int consumedSubpartitionIndex,
 			int numberOfInputChannels,
-			PartitionStateChecker partitionStateChecker,
+			TaskActions taskActions,
 			IOMetricGroup metrics) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
@@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate {
 		this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
 		this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
 
-		this.partitionStateChecker = checkNotNull(partitionStateChecker);
+		this.taskActions = checkNotNull(taskActions);
 	}
 
 	// ------------------------------------------------------------------------
@@ -487,7 +487,7 @@ public class SingleInputGate implements InputGate {
 	}
 
 	void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-		partitionStateChecker.triggerPartitionStateCheck(
+		taskActions.triggerPartitionStateCheck(
 				jobId,
 				executionId,
 				consumedResultId,
@@ -505,7 +505,7 @@ public class SingleInputGate implements InputGate {
 			ExecutionAttemptID executionId,
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment,
-			PartitionStateChecker partitionStateChecker,
+			TaskActions taskActions,
 			IOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
@@ -517,7 +517,7 @@ public class SingleInputGate implements InputGate {
 
 		final SingleInputGate inputGate = new SingleInputGate(
 				owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex,
-				icdd.length, partitionStateChecker, metrics);
+				icdd.length, taskActions, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
deleted file mode 100644
index 4697c79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Factory implementation which generates {@link ActorGateway} based job manager communication
- * components.
- */
-public class ActorGatewayJobManagerCommunicationFactory implements JobManagerCommunicationFactory {
-	private final ExecutionContext executionContext;
-	private final ActorGateway jobManagerGateway;
-	private final ActorGateway taskManagerGateway;
-	private final FiniteDuration jobManagerMessageTimeout;
-
-	public ActorGatewayJobManagerCommunicationFactory(
-		ExecutionContext executionContext,
-		ActorGateway jobManagerGateway,
-		ActorGateway taskManagerGateway,
-		FiniteDuration jobManagerMessageTimeout) {
-
-		this.executionContext = Preconditions.checkNotNull(executionContext);
-		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
-		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
-		this.jobManagerMessageTimeout = Preconditions.checkNotNull(jobManagerMessageTimeout);
-	}
-
-	public PartitionStateChecker createPartitionStateChecker() {
-		return new ActorGatewayPartitionStateChecker(jobManagerGateway, taskManagerGateway);
-	}
-
-	public ResultPartitionConsumableNotifier createResultPartitionConsumableNotifier(Task owningTask) {
-		return new ActorGatewayResultPartitionConsumableNotifier(
-			executionContext,
-			jobManagerGateway,
-			owningTask,
-			jobManagerMessageTimeout);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
index e7c6690..efa6ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
@@ -19,12 +19,18 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
 
 /**
  * This implementation uses {@link ActorGateway} to trigger the partition state check at the job
@@ -33,27 +39,29 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 public class ActorGatewayPartitionStateChecker implements PartitionStateChecker {
 
 	private final ActorGateway jobManager;
+	private final FiniteDuration timeout;
 
-	private final ActorGateway taskManager;
-
-	public ActorGatewayPartitionStateChecker(ActorGateway jobManager, ActorGateway taskManager) {
-		this.jobManager = jobManager;
-		this.taskManager = taskManager;
+	public ActorGatewayPartitionStateChecker(ActorGateway jobManager, FiniteDuration timeout) {
+		this.jobManager = Preconditions.checkNotNull(jobManager);
+		this.timeout = Preconditions.checkNotNull(timeout);
 	}
 
 	@Override
-	public void triggerPartitionStateCheck(
-		JobID jobId,
-		ExecutionAttemptID executionAttemptID,
-		IntermediateDataSetID resultId,
-		ResultPartitionID partitionId) {
-
+	public Future<PartitionState> requestPartitionState(
+			JobID jobId,
+			ExecutionAttemptID executionAttemptId,
+			IntermediateDataSetID resultId,
+			ResultPartitionID partitionId) {
 		JobManagerMessages.RequestPartitionState msg = new JobManagerMessages.RequestPartitionState(
 			jobId,
 			partitionId,
-			executionAttemptID,
+			executionAttemptId,
 			resultId);
 
-		jobManager.tell(msg, taskManager);
+		scala.concurrent.Future<PartitionState> futureResponse = jobManager
+			.ask(msg, timeout)
+			.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+
+		return new FlinkFuture<>(futureResponse);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
index b91120b..ee7d3fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
@@ -47,24 +47,20 @@ public class ActorGatewayResultPartitionConsumableNotifier implements ResultPart
 
 	private final ActorGateway jobManager;
 
-	private final Task owningTask;
-
 	private final FiniteDuration jobManagerMessageTimeout;
 
 	public ActorGatewayResultPartitionConsumableNotifier(
 		ExecutionContext executionContext,
 		ActorGateway jobManager,
-		Task owningTask,
 		FiniteDuration jobManagerMessageTimeout) {
 
 		this.executionContext = Preconditions.checkNotNull(executionContext);
 		this.jobManager = Preconditions.checkNotNull(jobManager);
-		this.owningTask = Preconditions.checkNotNull(owningTask);
 		this.jobManagerMessageTimeout = Preconditions.checkNotNull(jobManagerMessageTimeout);
 	}
 
 	@Override
-	public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
+	public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId, final TaskActions taskActions) {
 
 		final JobManagerMessages.ScheduleOrUpdateConsumers msg = new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
 
@@ -75,7 +71,7 @@ public class ActorGatewayResultPartitionConsumableNotifier implements ResultPart
 			public void onFailure(Throwable failure) {
 				LOG.error("Could not schedule or update consumers at the JobManager.", failure);
 
-				owningTask.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", failure));
+				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", failure));
 			}
 		}, executionContext);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
deleted file mode 100644
index 64cfcb1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-
-/**
- * Factory to generate job manager specific communication components.
- */
-public interface JobManagerCommunicationFactory {
-
-	/**
-	 * Creates a {@link PartitionStateChecker} which communicates with the associated job manager of
-	 * this instance.
-	 *
-	 * @return PartitionStateChecker which communicates with the associated job manager of this
-	 * 			instance
-	 */
-	PartitionStateChecker createPartitionStateChecker();
-
-	/**
-	 * Creates a {@link ResultPartitionConsumableNotifier} which communicates with the associated
-	 * job manager of this instance.
-	 *
-	 * @param owningTask Task which is associated with the ResultPartitionConsumableNotifier
-	 * @return ResultPartitionConsumableNotifier which communicates with the associated job manager
-	 * 			of this instance
-	 */
-	ResultPartitionConsumableNotifier createResultPartitionConsumableNotifier(Task owningTask);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f7634cc..62dc8b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -59,6 +61,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,10 +72,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -98,7 +103,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>Each Task is run by one dedicated thread.
  */
-public class Task implements Runnable {
+public class Task implements Runnable, TaskActions {
 
 	/** The class logger. */
 	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -197,6 +202,12 @@ public class Task implements Runnable {
 	/** Parent group for all metrics of this task */
 	private final TaskMetricGroup metrics;
 
+	/** Partition state checker to request partition states from */
+	private final PartitionStateChecker partitionStateChecker;
+
+	/** Executor to run future callbacks */
+	private final Executor executor;
+
 	// ------------------------------------------------------------------------
 	//  Fields that control the task execution. All these fields are volatile
 	//  (which means that they introduce memory barriers), to establish
@@ -242,7 +253,6 @@ public class Task implements Runnable {
 		MemoryManager memManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
-		JobManagerCommunicationFactory jobManagerCommunicationFactory,
 		BroadcastVariableManager bcVarManager,
 		TaskManagerConnection taskManagerConnection,
 		InputSplitProvider inputSplitProvider,
@@ -250,7 +260,10 @@ public class Task implements Runnable {
 		LibraryCacheManager libraryCache,
 		FileCache fileCache,
 		TaskManagerRuntimeInfo taskManagerConfig,
-		TaskMetricGroup metricGroup) {
+		TaskMetricGroup metricGroup,
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+		PartitionStateChecker partitionStateChecker,
+		Executor executor) {
 
 		this.taskInfo = checkNotNull(tdd.getTaskInfo());
 		this.jobId = checkNotNull(tdd.getJobID());
@@ -287,6 +300,9 @@ public class Task implements Runnable {
 		this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
 		this.metrics = metricGroup;
 
+		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
+		this.executor = Preconditions.checkNotNull(executor);
+
 		// create the reader and writer structures
 
 		final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')';
@@ -298,33 +314,29 @@ public class Task implements Runnable {
 		this.producedPartitions = new ResultPartition[partitions.size()];
 		this.writers = new ResultPartitionWriter[partitions.size()];
 
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier =
-			jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(this);
-
 		for (int i = 0; i < this.producedPartitions.length; i++) {
 			ResultPartitionDeploymentDescriptor desc = partitions.get(i);
 			ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
 
 			this.producedPartitions[i] = new ResultPartition(
-					taskNameWithSubtaskAndId,
-					jobId,
-					partitionId,
-					desc.getPartitionType(),
-					desc.getEagerlyDeployConsumers(),
-					desc.getNumberOfSubpartitions(),
-					networkEnvironment.getResultPartitionManager(),
-					resultPartitionConsumableNotifier,
-					ioManager,
-					networkEnvironment.getDefaultIOMode());
+				taskNameWithSubtaskAndId,
+				this,
+				jobId,
+				partitionId,
+				desc.getPartitionType(),
+				desc.getEagerlyDeployConsumers(),
+				desc.getNumberOfSubpartitions(),
+				networkEnvironment.getResultPartitionManager(),
+				resultPartitionConsumableNotifier,
+				ioManager,
+				networkEnvironment.getDefaultIOMode());
 
 			this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
 		}
 
 		// Consumed intermediate result partitions
 		this.inputGates = new SingleInputGate[consumedPartitions.size()];
-		this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
-
-		PartitionStateChecker partitionStateChecker = jobManagerCommunicationFactory.createPartitionStateChecker();
+		this.inputGatesById = new HashMap<>();
 
 		for (int i = 0; i < this.inputGates.length; i++) {
 			SingleInputGate gate = SingleInputGate.create(
@@ -333,7 +345,7 @@ public class Task implements Runnable {
 				executionId,
 				consumedPartitions.get(i),
 				networkEnvironment,
-				partitionStateChecker,
+				this,
 				metricGroup.getIOMetricGroup());
 
 			this.inputGates[i] = gate;
@@ -920,6 +932,49 @@ public class Task implements Runnable {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Partition State Listeners
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void triggerPartitionStateCheck(
+		JobID jobId,
+		ExecutionAttemptID executionId,
+		final IntermediateDataSetID resultId,
+		final ResultPartitionID partitionId) {
+		org.apache.flink.runtime.concurrent.Future<PartitionState> futurePartitionState = partitionStateChecker.requestPartitionState(
+			jobId,
+			executionId,
+			resultId,
+			partitionId);
+
+		futurePartitionState.handleAsync(new BiFunction<PartitionState, Throwable, Void>() {
+			@Override
+			public Void apply(PartitionState partitionState, Throwable throwable) {
+				try {
+					if (partitionState != null) {
+						onPartitionStateUpdate(
+							partitionState.getIntermediateDataSetID(),
+							partitionState.getIntermediateResultPartitionID(),
+							partitionState.getExecutionState());
+					} else if (throwable instanceof TimeoutException) {
+						// our request timed out, assume we're still running and try again
+						onPartitionStateUpdate(
+							resultId,
+							partitionId.getPartitionId(),
+							ExecutionState.RUNNING);
+					} else {
+						failExternally(throwable);
+					}
+				} catch (IOException | InterruptedException e) {
+					failExternally(e);
+				}
+
+				return null;
+			}
+		}, executor);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Notifications on the invokable
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
new file mode 100644
index 0000000..4f12691
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Actions which can be performed on a {@link Task}.
+ */
+public interface TaskActions {
+
+	/**
+	 * Check the partition state of the given partition.
+	 *
+	 * @param jobId of the partition
+	 * @param executionId of the partition
+	 * @param resultId of the partition
+	 * @param partitionId of the partition
+	 */
+	void triggerPartitionStateCheck(
+		JobID jobId,
+		ExecutionAttemptID executionId,
+		IntermediateDataSetID resultId,
+		ResultPartitionID partitionId);
+
+	/**
+	 * Fail the owning task with the given throwawble.
+	 *
+	 * @param cause of the failure
+	 */
+	void failExternally(Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 869f6e5..01f9cec 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.io.network.PartitionState
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -62,7 +63,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
-import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified}
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.InfoMessage
@@ -873,8 +874,7 @@ class JobManager(
       }
 
       sender ! decorateMessage(
-        PartitionState(
-          taskExecutionId,
+        new PartitionState(
           taskResultId,
           partitionId.getPartitionId,
           state)

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index 94762ee..158024d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
-import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 /**
@@ -92,16 +90,6 @@ object TaskMessages {
   // --------------------------------------------------------------------------
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the respective partition.
-   */
-  case class PartitionState(
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID,
-      partitionId: IntermediateResultPartitionID,
-      state: ExecutionState)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
    * Base class for messages that update the information about location of input partitions
    */
   abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c6dcbb0..af2b38f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -55,8 +55,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
 import org.apache.flink.runtime.io.network.{LocalConnectionManager, NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager}
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, NettyConnectionManager, PartitionStateChecker}
+import org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier, ResultPartitionManager}
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -196,7 +196,11 @@ class TaskManager(
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
 
-  private var jobManagerConnectionFactory: Option[JobManagerCommunicationFactory] = None
+  private var connectionUtils: Option[(
+    CheckpointResponder,
+    PartitionStateChecker,
+    ResultPartitionConsumableNotifier,
+    TaskManagerConnection)] = None
 
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
@@ -514,14 +518,6 @@ class TaskManager(
               "No task with that execution ID was found.")
             )
           }
-
-        case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
-          Option(runningTasks.get(taskExecutionId)) match {
-            case Some(task) =>
-              task.onPartitionStateUpdate(taskResultId, partitionId, state)
-            case None =>
-              log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
-          }
       }
       }
   }
@@ -930,7 +926,7 @@ class TaskManager(
       "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up currently
-    if (jobManagerConnectionFactory.isDefined || blobService.isDefined) {
+    if (connectionUtils.isDefined || blobService.isDefined) {
       throw new IllegalStateException("JobManager-specific components are already initialized.")
     }
 
@@ -938,14 +934,26 @@ class TaskManager(
     instanceID = id
 
     val jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID.orNull)
-    val taskmanagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
+    val taskManagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
 
-    jobManagerConnectionFactory = Some(
-      new ActorGatewayJobManagerCommunicationFactory(
-        context.dispatcher,
-        jobManagerGateway,
-        taskmanagerGateway,
-        config.timeout))
+    val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
+
+    val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway)
+
+    val partitionStateChecker = new ActorGatewayPartitionStateChecker(
+      jobManagerGateway,
+      config.timeout)
+
+    val resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier(
+      context.dispatcher,
+      jobManagerGateway,
+      config.timeout)
+
+    connectionUtils = Some(
+      (checkpointResponder,
+        partitionStateChecker,
+        resultPartitionConsumableNotifier,
+        taskManagerConnection))
 
 
     val kvStateServer = network.getKvStateServer()
@@ -1052,7 +1060,7 @@ class TaskManager(
     blobService = None
 
     // disassociate the slot environment
-    jobManagerConnectionFactory = None
+    connectionUtils = None
 
     if (network.getKvStateRegistry != null) {
       network.getKvStateRegistry.unregisterListener()
@@ -1119,23 +1127,24 @@ class TaskManager(
         case None => throw new IllegalStateException("There is no valid library cache manager.")
       }
 
-      val jmFactory = jobManagerConnectionFactory match {
-        case Some(factory) => factory
-        case None =>
-          throw new IllegalStateException("TaskManager is not associated with a JobManager and, " +
-                                            "thus, the SlotEnvironment has not been initialized.")
-      }
-
       val slot = tdd.getTargetSlotNumber
       if (slot < 0 || slot >= numberOfSlots) {
         throw new IllegalArgumentException(s"Target slot $slot does not exist on TaskManager.")
       }
 
+      val (checkpointResponder,
+        partitionStateChecker,
+        resultPartitionConsumableNotifier,
+        taskManagerConnection) = connectionUtils match {
+        case Some(x) => x
+        case None => throw new IllegalStateException("The connection utils have not been " +
+                                                       "initialized.")
+      }
+
       // create the task. this does not grab any TaskManager resources or download
       // and libraries - the operation does not block
 
       val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
-      val selfGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
 
       var jobName = tdd.getJobName
       if (tdd.getJobName.length == 0) {
@@ -1153,16 +1162,11 @@ class TaskManager(
         tdd.getExecutionId,
         config.timeout)
 
-      val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
-
-      val taskManagerConnection = new ActorGatewayTaskManagerConnection(selfGateway)
-
       val task = new Task(
         tdd,
         memoryManager,
         ioManager,
         network,
-        jmFactory,
         bcVarManager,
         taskManagerConnection,
         inputSplitProvider,
@@ -1170,7 +1174,10 @@ class TaskManager(
         libCache,
         fileCache,
         runtimeInfo,
-        taskMetricGroup)
+        taskMetricGroup,
+        resultPartitionConsumableNotifier,
+        partitionStateChecker,
+        context.dispatcher)
 
       log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks()}")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 9f39de1..a9ad75d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -35,12 +35,9 @@ import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateCons
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
@@ -95,26 +92,18 @@ public class NetworkEnvironmentTest {
 
 		env.start();
 
-		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
-
-		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenAnswer(new Answer<ResultPartitionConsumableNotifier>() {
-			@Override
-			public ResultPartitionConsumableNotifier answer(InvocationOnMock invocation) throws Throwable {
-				return new ActorGatewayResultPartitionConsumableNotifier(
-					TestingUtils.defaultExecutionContext(),
-					jobManager,
-					(Task)invocation.getArguments()[0],
-					new FiniteDuration(30, TimeUnit.SECONDS));
-			}
-		});
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier(
+			TestingUtils.defaultExecutionContext(),
+			jobManager,
+			new FiniteDuration(30L, TimeUnit.SECONDS));
 
 		// Register mock task
 		JobID jobId = new JobID();
 		Task mockTask = mock(Task.class);
 
 		ResultPartition[] partitions = new ResultPartition[2];
-		partitions[0] = createPartition(mockTask, "p1", jobId, true, env, jobManagerCommunicationFactory);
-		partitions[1] = createPartition(mockTask, "p2", jobId, false, env, jobManagerCommunicationFactory);
+		partitions[0] = createPartition(mockTask, "p1", jobId, true, env, resultPartitionConsumableNotifier);
+		partitions[1] = createPartition(mockTask, "p2", jobId, false, env, resultPartitionConsumableNotifier);
 
 		ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
 		writers[0] = new ResultPartitionWriter(partitions[0]);
@@ -143,18 +132,19 @@ public class NetworkEnvironmentTest {
 		JobID jobId,
 		boolean eagerlyDeployConsumers,
 		NetworkEnvironment env,
-		JobManagerCommunicationFactory jobManagerCommunicationFactory) {
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) {
 
 		return new ResultPartition(
-				name,
-				jobId,
-				new ResultPartitionID(),
-				ResultPartitionType.PIPELINED,
-				eagerlyDeployConsumers,
-				1,
-				env.getResultPartitionManager(),
-				jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(owningTask),
-				mock(IOManager.class),
-				env.getDefaultIOMode());
+			name,
+			owningTask,
+			jobId,
+			new ResultPartitionID(),
+			ResultPartitionType.PIPELINED,
+			eagerlyDeployConsumers,
+			1,
+			env.getResultPartitionManager(),
+			resultPartitionConsumableNotifier,
+			mock(IOManager.class),
+			env.getDefaultIOMode());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 18d9073..75f2bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -44,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -101,6 +101,8 @@ public class LocalInputChannelTest {
 		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
 				mock(ResultPartitionConsumableNotifier.class);
 
+		final TaskActions taskActions = mock(TaskActions.class);
+
 		final IOManager ioManager = mock(IOManager.class);
 
 		final JobID jobId = new JobID();
@@ -115,16 +117,17 @@ public class LocalInputChannelTest {
 			partitionIds[i] = new ResultPartitionID();
 
 			final ResultPartition partition = new ResultPartition(
-					"Test Name",
-					jobId,
-					partitionIds[i],
-					ResultPartitionType.PIPELINED,
-					false,
-					parallelism,
-					partitionManager,
-					partitionConsumableNotifier,
-					ioManager,
-					ASYNC);
+				"Test Name",
+				taskActions,
+				jobId,
+				partitionIds[i],
+				ResultPartitionType.PIPELINED,
+				false,
+				parallelism,
+				partitionManager,
+				partitionConsumableNotifier,
+				ioManager,
+				ASYNC);
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
@@ -343,14 +346,14 @@ public class LocalInputChannelTest {
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
 			this.inputGate = new SingleInputGate(
-					"Test Name",
-					new JobID(),
-					new ExecutionAttemptID(),
-					new IntermediateDataSetID(),
-					subpartitionIndex,
-					numberOfInputChannels,
-					mock(PartitionStateChecker.class),
-					new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				"Test Name",
+				new JobID(),
+				new ExecutionAttemptID(),
+				new IntermediateDataSetID(),
+				subpartitionIndex,
+				numberOfInputChannels,
+				mock(TaskActions.class),
+				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index f55fee5..9e4ab86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
@@ -39,8 +38,8 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -67,7 +66,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -114,7 +113,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -163,13 +162,14 @@ public class SingleInputGateTest {
 	@Test
 	public void testUpdateChannelBeforeRequest() throws Exception {
 		SingleInputGate inputGate = new SingleInputGate(
-				"t1",
-				new JobID(),
-				new ExecutionAttemptID(),
-				new IntermediateDataSetID(),
-				0,
-				1,
-				mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			"t1",
+			new JobID(),
+			new ExecutionAttemptID(),
+			new IntermediateDataSetID(),
+			0,
+			1,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -205,14 +205,14 @@ public class SingleInputGateTest {
 
 		// Setup the input gate with a single channel that does nothing
 		final SingleInputGate inputGate = new SingleInputGate(
-				"InputGate",
-				new JobID(),
-				new ExecutionAttemptID(),
-				new IntermediateDataSetID(),
-				0,
-				1,
-				mock(PartitionStateChecker.class),
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			"InputGate",
+			new JobID(),
+			new ExecutionAttemptID(),
+			new IntermediateDataSetID(),
+			0,
+			1,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
 			inputGate,

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 607da94..867c273 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -60,7 +60,14 @@ public class TestSingleInputGate {
 		checkArgument(numberOfInputChannels >= 1);
 
 		SingleInputGate realGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, numberOfInputChannels, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			"Test Task Name",
+			new JobID(),
+			new ExecutionAttemptID(),
+			new IntermediateDataSetID(),
+			0,
+			numberOfInputChannels,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 28f621f..466879e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -44,8 +44,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(PartitionStateChecker.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index d731b95..8d150ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -44,13 +45,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
-import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
@@ -204,52 +203,43 @@ public class JobManagerTest {
 						for (ExecutionState state : ExecutionState.values()) {
 							ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-							jobManagerGateway.tell(request, testActorGateway);
+							Future<PartitionState> futurePartitionState = jobManagerGateway
+								.ask(request, getRemainingTime())
+								.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
 
-							LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+							PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
 
-							assertEquals(PartitionState.class, lsm.message().getClass());
-
-							PartitionState resp = (PartitionState) lsm.message();
-
-							assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-							assertEquals(request.taskResultId(), resp.taskResultId());
-							assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-							assertEquals(state, resp.state());
+							assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
+							assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
+							assertEquals(state, resp.getExecutionState());
 						}
 
 						// 2. Non-existing execution
 						request = new RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
 
-						jobManagerGateway.tell(request, testActorGateway);
-
-						LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class);
+						Future<PartitionState> futurePartitionState = jobManagerGateway
+							.ask(request, getRemainingTime())
+							.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
 
-						assertEquals(PartitionState.class, lsm.message().getClass());
+						PartitionState resp = Await.result(futurePartitionState, getRemainingTime());
 
-						PartitionState resp = (PartitionState) lsm.message();
-
-						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-						assertEquals(request.taskResultId(), resp.taskResultId());
-						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-						assertNull(resp.state());
+						assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
+						assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
+						assertNull(resp.getExecutionState());
 
 						// 3. Non-existing job
 						request = new RequestPartitionState(
 							new JobID(), new ResultPartitionID(), receiver, rid);
 
-						jobManagerGateway.tell(request, testActorGateway);
-
-						lsm = expectMsgClass(LeaderSessionMessage.class);
-
-						assertEquals(PartitionState.class, lsm.message().getClass());
+						futurePartitionState = jobManagerGateway
+							.ask(request, getRemainingTime())
+							.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
 
-						resp = (PartitionState) lsm.message();
+						resp = Await.result(futurePartitionState, getRemainingTime());
 
-						assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-						assertEquals(request.taskResultId(), resp.taskResultId());
-						assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-						assertNull(resp.state());
+						assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
+						assertEquals(request.partitionId().getPartitionId(), resp.getIntermediateResultPartitionID());
+						assertNull(resp.getExecutionState());
 					} catch (Exception e) {
 						e.printStackTrace();
 						fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ed30fd7..454196f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -54,6 +55,7 @@ import org.junit.Test;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -145,15 +147,14 @@ public class TaskAsyncCallTest {
 		
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+		Executor executor = mock(Executor.class);
 		NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
 		when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
 		when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
-		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
-
 		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
 				new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
 				new SerializedValue<>(new ExecutionConfig()),
@@ -170,7 +171,6 @@ public class TaskAsyncCallTest {
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,
-			jobManagerCommunicationFactory,
 			mock(BroadcastVariableManager.class),
 			mock(TaskManagerConnection.class),
 			mock(InputSplitProvider.class),
@@ -178,7 +178,10 @@ public class TaskAsyncCallTest {
 			libCache,
 			mock(FileCache.class),
 			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-			mock(TaskMetricGroup.class));
+			mock(TaskMetricGroup.class),
+			consumableNotifier,
+			partitionStateChecker,
+			executor);
 	}
 
 	public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask {

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0774fd5..d4efd24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -59,7 +60,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
@@ -1485,7 +1485,6 @@ public class TaskManagerTest extends TestLogger {
 				final RequestPartitionState msg = (RequestPartitionState) message;
 
 				PartitionState resp = new PartitionState(
-						msg.taskExecutionId(),
 						msg.taskResultId(),
 						msg.partitionId().getPartitionId(),
 						ExecutionState.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index d041465..9791cee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -42,6 +44,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
+import java.util.concurrent.Executor;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -71,7 +74,6 @@ public class TaskStopTest {
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			mock(NetworkEnvironment.class),
-			mock(JobManagerCommunicationFactory.class),
 			mock(BroadcastVariableManager.class),
 			mock(TaskManagerConnection.class),
 			mock(InputSplitProvider.class),
@@ -79,7 +81,10 @@ public class TaskStopTest {
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
 			mock(TaskManagerRuntimeInfo.class),
-			mock(TaskMetricGroup.class));
+			mock(TaskMetricGroup.class),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(PartitionStateChecker.class),
+			mock(Executor.class));
 		Field f = task.getClass().getDeclaredField("invokable");
 		f.setAccessible(true);
 		f.set(task, taskMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index e5fdf32..9a13cde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -60,6 +61,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -250,15 +252,14 @@ public class TaskTest {
 			// mock a network manager that rejects registration
 			ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 			ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+			PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+			Executor executor = mock(Executor.class);
 			NetworkEnvironment network = mock(NetworkEnvironment.class);
 			when(network.getResultPartitionManager()).thenReturn(partitionManager);
 			when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 			doThrow(new RuntimeException("buffers")).when(network).registerTask(any(Task.class));
 
-			JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
-			when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
-			
-			Task task = createTask(TestInvokableCorrect.class, libCache, network, jobManagerCommunicationFactory);
+			Task task = createTask(TestInvokableCorrect.class, libCache, network, consumableNotifier, partitionStateChecker, executor);
 
 			task.registerExecutionListener(listener);
 
@@ -606,23 +607,24 @@ public class TaskTest {
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
+		PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class);
+		Executor executor = mock(Executor.class);
 		NetworkEnvironment network = mock(NetworkEnvironment.class);
-		JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
 		when(network.getResultPartitionManager()).thenReturn(partitionManager);
 		when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
 		when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
-		when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
-		
-		return createTask(invokable, libCache, network, jobManagerCommunicationFactory);
+		return createTask(invokable, libCache, network, consumableNotifier, partitionStateChecker, executor);
 	}
 	
 	private Task createTask(
 		Class<? extends AbstractInvokable> invokable,
 		LibraryCacheManager libCache,
 		NetworkEnvironment networkEnvironment,
-		JobManagerCommunicationFactory jobManagerCommunicationFactory) {
+		ResultPartitionConsumableNotifier consumableNotifier,
+		PartitionStateChecker partitionStateChecker,
+		Executor executor) {
 		
 		TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(invokable);
 
@@ -640,7 +642,6 @@ public class TaskTest {
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,
-			jobManagerCommunicationFactory,
 			mock(BroadcastVariableManager.class),
 			taskManagerConnection,
 			inputSplitProvider,
@@ -648,7 +649,10 @@ public class TaskTest {
 			libCache,
 			mock(FileCache.class),
 			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
-			mock(TaskMetricGroup.class));
+			mock(TaskMetricGroup.class),
+			consumableNotifier,
+			partitionStateChecker,
+			executor);
 	}
 
 	private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {

http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 9f52e9c..6a7b024 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -34,6 +34,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -41,7 +43,6 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -64,6 +65,7 @@ import java.io.Serializable;
 import java.net.URL;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
@@ -150,7 +152,6 @@ public class InterruptSensitiveRestoreTest {
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,
-			mock(JobManagerCommunicationFactory.class),
 			mock(BroadcastVariableManager.class),
 				mock(TaskManagerConnection.class),
 				mock(InputSplitProvider.class),
@@ -159,7 +160,10 @@ public class InterruptSensitiveRestoreTest {
 			new FileCache(new Configuration()),
 			new TaskManagerRuntimeInfo(
 					"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
-			new UnregisteredTaskMetricsGroup());
+			new UnregisteredTaskMetricsGroup(),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(PartitionStateChecker.class),
+			mock(Executor.class));
 		
 	}