You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:37:38 UTC

[3/4] flink git commit: [FLINK-4887] [execution graph] Introduce TaskManagerGateway to encapsulate communcation logic

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 788dee4..c0ce799 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,49 +18,49 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.dispatch.OnComplete;
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import static akka.dispatch.Futures.future;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
 import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
@@ -69,13 +69,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
-import static org.apache.flink.runtime.messages.TaskMessages.StopTask;
-import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
-import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskMessages.createUpdateTaskMultiplePartitionInfos;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -117,7 +110,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private final int attemptNumber;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
@@ -131,8 +124,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private TaskStateHandles taskStateHandles;
 
-	/** The execution context which is used to execute futures. */
-	private ExecutionContext executionContext;
+	/** The executor which is used to execute futures. */
+	private Executor executor;
 
 	// ------------------------- Accumulators ---------------------------------
 	
@@ -147,12 +140,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	// --------------------------------------------------------------------------------------------
 	
 	public Execution(
-			ExecutionContext executionContext,
+			Executor executor,
 			ExecutionVertex vertex,
 			int attemptNumber,
 			long startTimestamp,
-			FiniteDuration timeout) {
-		this.executionContext = checkNotNull(executionContext);
+			Time timeout) {
+		this.executor = checkNotNull(executor);
 
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
@@ -162,9 +155,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.stateTimestamps = new long[ExecutionState.values().length];
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
-		this.timeout = timeout;
+		this.timeout = checkNotNull(timeout);
 
-		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
+		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -280,9 +273,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
 			final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued);
 
-			// IMPORTANT: We have to use the direct executor here so that we directly deploy the tasks
-			// if the slot allocation future is completed. This is necessary for immediate deployment
-			final Future<Void> deploymentFuture = slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
+			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
+			// that we directly deploy the tasks if the slot allocation future is completed. This is
+			// necessary for immediate deployment.
+			final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
 				@Override
 				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
 					if (simpleSlot != null) {
@@ -301,7 +295,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 					return null;
 				}
-			}, Executors.directExecutor());
+			});
 
 			// if tasks have to scheduled immediately check that the task has been deployed
 			if (!queued) {
@@ -370,34 +364,26 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);
 			
-			final ActorGateway gateway = slot.getTaskManagerActorGateway();
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			final scala.concurrent.Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
-
-			deployAction.onComplete(new OnComplete<Object>(){
+			final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
 
+			submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onComplete(Throwable failure, Object success) throws Throwable {
-					if (failure != null) {
-						if (failure instanceof TimeoutException) {
-							String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
-
-							markFailed(new Exception(
-									"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
-									+ ") not responding after a timeout of " + timeout, failure));
-						}
-						else {
-							markFailed(failure);
-						}
+				public Void apply(Throwable failure) {
+					if (failure instanceof TimeoutException) {
+						String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
+
+						markFailed(new Exception(
+							"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+								+ ") not responding after a timeout of " + timeout, failure));
 					}
 					else {
-						if (!(success.equals(Messages.getAcknowledge()))) {
-							markFailed(new Exception("Failed to deploy the task to slot. Response was not of type 'Acknowledge', but was " + success
-									+ "\nSlot Details: " + slot));
-						}
+						markFailed(failure);
 					}
+					return null;
 				}
-			}, executionContext);
+			}, executor);
 		}
 		catch (Throwable t) {
 			markFailed(t);
@@ -409,31 +395,29 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = this.assignedResource;
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
-			final ActorGateway gateway = slot.getTaskManagerActorGateway();
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			scala.concurrent.Future<Object> stopResult = gateway.retry(
-				new StopTask(attemptId),
+			Future<Acknowledge> stopResultFuture = FutureUtils.retry(
+				new Callable<Future<Acknowledge>>() {
+
+					@Override
+					public Future<Acknowledge> call() throws Exception {
+						return taskManagerGateway.stopTask(attemptId, timeout);
+					}
+				},
 				NUM_STOP_CALL_TRIES,
-				timeout,
-				executionContext);
+				executor);
 
-			stopResult.onComplete(new OnComplete<Object>() {
+			stopResultFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onComplete(Throwable failure, Object success) throws Throwable {
-					if (failure != null) {
-						fail(new Exception("Task could not be stopped.", failure));
-					} else {
-						TaskOperationResult result = (TaskOperationResult) success;
-						if (!result.success()) {
-							LOG.info("Stopping task was not successful. Description: {}",
-									result.description());
-						}
-					}
+				public Void apply(Throwable failure) {
+					LOG.info("Stopping task was not successful.", failure);
+					return null;
 				}
-			}, executionContext);
+			});
 		}
 	}
 
@@ -532,9 +516,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// TODO The current approach may send many update messages even though the consuming
 				// task has already been deployed with all necessary information. We have to check
 				// whether this is a problem and fix it, if it is.
-				future(new Callable<Boolean>(){
+				FlinkFuture.supplyAsync(new Callable<Void>(){
 					@Override
-					public Boolean call() throws Exception {
+					public Void call() throws Exception {
 						try {
 							consumerVertex.scheduleForExecution(
 									consumerVertex.getExecutionGraph().getSlotProvider(),
@@ -544,9 +528,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 									"vertex " + consumerVertex, t));
 						}
 
-						return true;
+						return null;
 					}
-				}, executionContext);
+				}, executor);
 
 				// double check to resolve race conditions
 				if(consumerVertex.getExecutionState() == RUNNING){
@@ -592,12 +576,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
 							partitionId, partitionLocation);
 
-					final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
-						consumer.getAttemptId(),
-						partition.getIntermediateResult().getId(),
-						descriptor);
-
-					sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
+					consumer.sendUpdatePartitionInfoRpcCall(
+						Collections.singleton(
+							new PartitionInfo(
+								partition.getIntermediateResult().getId(),
+								descriptor)));
 				}
 				// ----------------------------------------------------------------
 				// Consumer is scheduled or deploying => cache input channel
@@ -629,6 +612,78 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		processFail(t, false);
 	}
 
+	/**
+	 * Request a stack trace sample from the task of this execution.
+	 *
+	 * @param sampleId of the stack trace sample
+	 * @param numSamples the sample should contain
+	 * @param delayBetweenSamples to wait
+	 * @param maxStrackTraceDepth of the samples
+	 * @param timeout until the request times out
+	 * @return Future stack trace sample response
+	 */
+	public Future<StackTraceSampleResponse> requestStackTraceSample(
+			int sampleId,
+			int numSamples,
+			Time delayBetweenSamples,
+			int maxStrackTraceDepth,
+			Time timeout) {
+
+		final SimpleSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			return taskManagerGateway.requestStackTraceSample(
+				attemptId,
+				sampleId,
+				numSamples,
+				delayBetweenSamples,
+				maxStrackTraceDepth,
+				timeout);
+		} else {
+			return FlinkCompletableFuture.completedExceptionally(new Exception("The execution has no slot assigned."));
+		}
+	}
+
+	/**
+	 * Notify the task of this execution about a completed checkpoint.
+	 *
+	 * @param checkpointId of the completed checkpoint
+	 * @param timestamp of the completed checkpoint
+	 */
+	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
+		final SimpleSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			taskManagerGateway.notifyCheckpointComplete(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+		} else {
+			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+				"no longer running.");
+		}
+	}
+
+	/**
+	 * Trigger a new checkpoint on the task of this execution.
+	 *
+	 * @param checkpointId of th checkpoint to trigger
+	 * @param timestamp of the checkpoint to trigger
+	 */
+	public void triggerCheckpoint(long checkpointId, long timestamp) {
+		final SimpleSlot slot = assignedResource;
+
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+
+			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+		} else {
+			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
+				"no longer running.");
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Callbacks
 	// --------------------------------------------------------------------------------------------
@@ -754,20 +809,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 			PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
 
-			List<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
-			List<InputChannelDeploymentDescriptor> inputChannelDeploymentDescriptors = new ArrayList<InputChannelDeploymentDescriptor>();
+			List<PartitionInfo> partitionInfos = new ArrayList<>(partialInputChannelDeploymentDescriptors.size());
 
 			while ((partialInputChannelDeploymentDescriptor = partialInputChannelDeploymentDescriptors.poll()) != null) {
-				resultIDs.add(partialInputChannelDeploymentDescriptor.getResultId());
-				inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
+				partitionInfos.add(
+					new PartitionInfo(
+						partialInputChannelDeploymentDescriptor.getResultId(),
+						partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)));
 			}
 
-			UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos(
-				attemptId,
-				resultIDs,
-				inputChannelDeploymentDescriptors);
-
-			sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage);
+			sendUpdatePartitionInfoRpcCall(partitionInfos);
 		}
 	}
 
@@ -888,70 +939,66 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = this.assignedResource;
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			final ActorGateway gateway = slot.getTaskManagerActorGateway();
-
-			scala.concurrent.Future<Object> cancelResult = gateway.retry(
-				new CancelTask(attemptId),
+			Future<Acknowledge> cancelResultFuture = FutureUtils.retry(
+				new Callable<Future<Acknowledge>>() {
+					@Override
+					public Future<Acknowledge> call() throws Exception {
+						return taskManagerGateway.cancelTask(attemptId, timeout);
+					}
+				},
 				NUM_CANCEL_CALL_TRIES,
-				timeout,
-				executionContext);
-
-			cancelResult.onComplete(new OnComplete<Object>() {
+				executor);
 
+			cancelResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onComplete(Throwable failure, Object success) throws Throwable {
-					if (failure != null) {
-						fail(new Exception("Task could not be canceled.", failure));
-					} else {
-						TaskOperationResult result = (TaskOperationResult) success;
-						if (!result.success()) {
-							LOG.debug("Cancel task call did not find task. Probably akka message call" +
-									" race.");
-						}
-					}
+				public Void apply(Throwable failure) {
+					fail(new Exception("Task could not be canceled.", failure));
+					return null;
 				}
-			}, executionContext);
+			}, executor);
 		}
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = this.assignedResource;
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
-			final ActorGateway gateway = slot.getTaskManagerActorGateway();
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			// TODO For some tests this could be a problem when querying too early if all resources were released
-			gateway.tell(new FailIntermediateResultPartitions(attemptId));
+			taskManagerGateway.failPartition(attemptId);
 		}
 	}
 
 	/**
-	 * Sends an UpdatePartitionInfo message to the instance of the consumerSlot.
+	 * Update the partition infos on the assigned resource.
 	 *
-	 * @param consumerSlot Slot to whose instance the message will be sent
-	 * @param updatePartitionInfo UpdatePartitionInfo message
+	 * @param partitionInfos for the remote task
 	 */
 	private void sendUpdatePartitionInfoRpcCall(
-			final SimpleSlot consumerSlot,
-			final UpdatePartitionInfo updatePartitionInfo) {
+			final Iterable<PartitionInfo> partitionInfos) {
 
-		if (consumerSlot != null) {
-			final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway();
-			final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation();
+		final SimpleSlot slot = assignedResource;
 
-			scala.concurrent.Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
+		if (slot != null) {
+			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
+			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
+
+			Future<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
 
-			futureUpdate.onFailure(new OnFailure() {
+			updatePartitionsResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override
-				public void onFailure(Throwable failure) throws Throwable {
+				public Void apply(Throwable failure) {
 					fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
-							" failed due to:", failure));
+						" failed due to:", failure));
+					return null;
 				}
-			}, executionContext);
+			}, executor);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index e2701da..244a113 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -61,8 +62,6 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.duration.FiniteDuration;
 import scala.Option;
 
 import java.io.IOException;
@@ -80,6 +79,7 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -168,7 +168,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	private final long[] stateTimestamps;
 
 	/** The timeout for all messages that require a response/acknowledgement */
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	// ------ Configuration of the Execution -------
 
@@ -214,8 +214,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * available after archiving. */
 	private CheckpointStatsTracker checkpointStatsTracker;
 
-	/** The execution context which is used to execute futures. */
-	private ExecutionContext executionContext;
+	/** The executor which is used to execute futures. */
+	private Executor executor;
 
 	/** Registered KvState instances reported by the TaskManagers. */
 	private KvStateLocationRegistry kvStateLocationRegistry;
@@ -231,15 +231,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
 	ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor executor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
 			SerializedValue<ExecutionConfig> serializedConfig,
-			FiniteDuration timeout,
+			Time timeout,
 			RestartStrategy restartStrategy) {
 		this(
-			executionContext,
+			executor,
 			jobId,
 			jobName,
 			jobConfig,
@@ -254,25 +254,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	public ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor executor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
 			SerializedValue<ExecutionConfig> serializedConfig,
-			FiniteDuration timeout,
+			Time timeout,
 			RestartStrategy restartStrategy,
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			ClassLoader userClassLoader,
 			MetricGroup metricGroup) {
 
-		checkNotNull(executionContext);
+		checkNotNull(executor);
 		checkNotNull(jobId);
 		checkNotNull(jobName);
 		checkNotNull(jobConfig);
 		checkNotNull(userClassLoader);
 
-		this.executionContext = executionContext;
+		this.executor = executor;
 
 		this.jobID = jobId;
 		this.jobName = jobName;
@@ -594,8 +594,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 *
 	 * @return ExecutionContext associated with this ExecutionGraph
 	 */
-	public ExecutionContext getExecutionContext() {
-		return executionContext;
+	public Executor getExecutor() {
+		return executor;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index dcd6a5d..756464d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -40,9 +40,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.slf4j.Logger;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -55,8 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utility class to encapsulate the logic of building an {@link ExecutionGraph} from a {@link JobGraph}.
  */
 public class ExecutionGraphBuilder {
-
-	/**
+		/**
 	 * Builds the ExecutionGraph from the JobGraph.
 	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
 	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
@@ -73,32 +69,6 @@ public class ExecutionGraphBuilder {
 			MetricGroup metrics,
 			int parallelismForAutoMax,
 			Logger log)
-			throws JobExecutionException, JobException
-	{
-		final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor);
-		
-		return buildGraph(prior, jobGraph, jobManagerConfig, executionContext,
-				classLoader, recoveryFactory, timeout, restartStrategy,
-				metrics, parallelismForAutoMax, log);
-	}
-
-	/**
-	 * Builds the ExecutionGraph from the JobGraph.
-	 * If a prior execution graph exists, the JobGraph will be attached. If no prior execution
-	 * graph exists, then the JobGraph will become attach to a new emoty execution graph.
-	 */
-	public static ExecutionGraph buildGraph(
-			@Nullable ExecutionGraph prior,
-			JobGraph jobGraph,
-			Configuration jobManagerConfig,
-			ExecutionContext executionContext,
-			ClassLoader classLoader,
-			CheckpointRecoveryFactory recoveryFactory,
-			Time timeout,
-			RestartStrategy restartStrategy,
-			MetricGroup metrics,
-			int parallelismForAutoMax,
-			Logger log)
 		throws JobExecutionException, JobException
 	{
 		checkNotNull(jobGraph, "job graph cannot be null");
@@ -109,12 +79,12 @@ public class ExecutionGraphBuilder {
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph = (prior != null) ? prior :
 				new ExecutionGraph(
-						executionContext,
+						executor,
 						jobId,
 						jobName,
 						jobGraph.getJobConfiguration(),
 						jobGraph.getSerializedExecutionConfig(),
-						new FiniteDuration(timeout.getSize(), timeout.getUnit()),
+						timeout,
 						restartStrategy,
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 2d9ec88..2bb63d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -45,7 +46,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import scala.Option;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -90,7 +90,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
-		FiniteDuration timeout) throws JobException {
+		Time timeout) throws JobException {
 
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
 	}
@@ -99,7 +99,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
-		FiniteDuration timeout,
+		Time timeout,
 		long createTimestamp) throws JobException {
 
 		if (graph == null || jobVertex == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b647385..8979d7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -29,7 +30,6 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,9 +46,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
-import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -86,7 +84,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final List<Execution> priorExecutions;
 
-	private final FiniteDuration timeout;
+	private final Time timeout;
 
 	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
 	private final String taskNameWithSubtask;
@@ -103,7 +101,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
-			FiniteDuration timeout) {
+			Time timeout) {
 		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis());
 	}
 
@@ -111,7 +109,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
-			FiniteDuration timeout,
+			Time timeout,
 			long createTimestamp) {
 
 		this.jobVertex = jobVertex;
@@ -133,7 +131,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
 		this.currentExecution = new Execution(
-			getExecutionGraph().getExecutionContext(),
+			getExecutionGraph().getExecutor(),
 			this,
 			0,
 			createTimestamp,
@@ -440,7 +438,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(
-					getExecutionGraph().getExecutionContext(),
+					getExecutionGraph().getExecutor(),
 					this,
 					execution.getAttemptNumber()+1,
 					System.currentTimeMillis(),
@@ -477,50 +475,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		this.currentExecution.fail(t);
 	}
 
-	public boolean sendMessageToCurrentExecution(
-			Serializable message,
-			ExecutionAttemptID attemptID) {
-
-		return sendMessageToCurrentExecution(message, attemptID, null);
-	}
-
-	public boolean sendMessageToCurrentExecution(
-			Serializable message,
-			ExecutionAttemptID attemptID,
-			ActorGateway sender) {
-		Execution exec = getCurrentExecutionAttempt();
-
-		// check that this is for the correct execution attempt
-		if (exec != null && exec.getAttemptId().equals(attemptID)) {
-			SimpleSlot slot = exec.getAssignedResource();
-
-			// send only if we actually have a target
-			if (slot != null) {
-				ActorGateway gateway = slot.getTaskManagerActorGateway();
-				if (gateway != null) {
-					if (sender == null) {
-						gateway.tell(message);
-					} else {
-						gateway.tell(message, sender);
-					}
-
-					return true;
-				} else {
-					return false;
-				}
-			}
-			else {
-				LOG.debug("Skipping message to undeployed task execution {}/{}", getSimpleName(), attemptID);
-				return false;
-			}
-		}
-		else {
-			LOG.debug("Skipping message to {}/{} because it does not match the current execution",
-					getSimpleName(), attemptID);
-			return false;
-		}
-	}
-
 	/**
 	 * Schedules or updates the consumer tasks of the result partition with the given ID.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
new file mode 100644
index 0000000..478550c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Contains information where to find a partition. The partition is defined by the
+ * {@link IntermediateDataSetID} and the partition location is specified by
+ * {@link InputChannelDeploymentDescriptor}.
+ */
+public class PartitionInfo implements Serializable {
+
+	private static final long serialVersionUID = 1724490660830968430L;
+	
+	private final IntermediateDataSetID intermediateDataSetID;
+	private final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor;
+
+	public PartitionInfo(IntermediateDataSetID intermediateResultPartitionID, InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor) {
+		this.intermediateDataSetID = Preconditions.checkNotNull(intermediateResultPartitionID);
+		this.inputChannelDeploymentDescriptor = Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
+	}
+
+	public IntermediateDataSetID getIntermediateDataSetID() {
+		return intermediateDataSetID;
+	}
+
+	public InputChannelDeploymentDescriptor getInputChannelDeploymentDescriptor() {
+		return inputChannelDeploymentDescriptor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 528eacb..3962e91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph.restart;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
@@ -28,8 +29,6 @@ import scala.concurrent.duration.Duration;
 import java.util.ArrayDeque;
 import java.util.concurrent.TimeUnit;
 
-import static akka.dispatch.Futures.future;
-
 /**
  * Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
  * with a fixed time delay in between.
@@ -71,7 +70,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutionContext());
+		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutor());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 8053e95..5337c6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -20,12 +20,11 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 
-import static akka.dispatch.Futures.future;
-
 /**
  * Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times
  * with a fixed time delay in between.
@@ -59,7 +58,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	@Override
 	public void restart(final ExecutionGraph executionGraph) {
 		currentRestartAttempt++;
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutionContext());
+		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutor());
 	}
 
 	/**
@@ -125,4 +124,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 			return new FixedDelayRestartStrategy(maxAttempts, delay);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d63d475..b5c6f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -29,8 +29,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +51,7 @@ public class Instance implements SlotOwner {
 	private final Object instanceLock = new Object();
 
 	/** The instance gateway to communicate with the instance */
-	private final ActorGateway actorGateway;
+	private final TaskManagerGateway taskManagerGateway;
 
 	/** The instance connection information for the data transfer. */
 	private final TaskManagerLocation location;
@@ -84,25 +86,25 @@ public class Instance implements SlotOwner {
 	/**
 	 * Constructs an instance reflecting a registered TaskManager.
 	 *
-	 * @param actorGateway The actor gateway to communicate with the remote instance
+	 * @param taskManagerGateway The actor gateway to communicate with the remote instance
 	 * @param location The remote connection where the task manager receives requests.
 	 * @param id The id under which the taskManager is registered.
 	 * @param resources The resources available on the machine.
 	 * @param numberOfSlots The number of task slots offered by this taskManager.
 	 */
 	public Instance(
-			ActorGateway actorGateway,
+			TaskManagerGateway taskManagerGateway,
 			TaskManagerLocation location,
 			InstanceID id,
 			HardwareDescription resources,
 			int numberOfSlots) {
-		this.actorGateway = actorGateway;
-		this.location = location;
-		this.instanceId = id;
-		this.resources = resources;
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+		this.location = Preconditions.checkNotNull(location);
+		this.instanceId = Preconditions.checkNotNull(id);
+		this.resources = Preconditions.checkNotNull(resources);
 		this.numberOfSlots = numberOfSlots;
 
-		this.availableSlots = new ArrayDeque<Integer>(numberOfSlots);
+		this.availableSlots = new ArrayDeque<>(numberOfSlots);
 		for (int i = 0; i < numberOfSlots; i++) {
 			this.availableSlots.add(i);
 		}
@@ -230,7 +232,7 @@ public class Instance implements SlotOwner {
 				return null;
 			}
 			else {
-				SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
+				SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -268,7 +270,7 @@ public class Instance implements SlotOwner {
 			}
 			else {
 				SharedSlot slot = new SharedSlot(
-						jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
+						jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -335,8 +337,8 @@ public class Instance implements SlotOwner {
 	 *
 	 * @return InstanceGateway associated with this instance
 	 */
-	public ActorGateway getActorGateway() {
-		return actorGateway;
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
 	}
 
 	public TaskManagerLocation getTaskManagerLocation() {
@@ -390,6 +392,6 @@ public class Instance implements SlotOwner {
 	@Override
 	public String toString() {
 		return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
-				numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
+				numberOfSlots, (taskManagerGateway != null ? taskManagerGateway.getAddress() : "No instance gateway"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 132ee6f..65909db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -25,10 +25,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
-import akka.actor.ActorRef;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,19 +130,17 @@ public class InstanceManager {
 	 * Registers a task manager. Registration of a task manager makes it available to be used
 	 * for the job execution.
 	 *
-	 * @param taskManager ActorRef to the TaskManager which wants to be registered
+	 * @param taskManagerGateway gateway to the task manager
 	 * @param taskManagerLocation Location info of the TaskManager
 	 * @param resources Hardware description of the TaskManager
 	 * @param numberOfSlots Number of available slots on the TaskManager
-	 * @param leaderSessionID The current leader session ID of the JobManager
 	 * @return The assigned InstanceID of the registered task manager
 	 */
 	public InstanceID registerTaskManager(
-			ActorRef taskManager,
+			TaskManagerGateway taskManagerGateway,
 			TaskManagerLocation taskManagerLocation,
 			HardwareDescription resources,
-			int numberOfSlots,
-			UUID leaderSessionID) {
+			int numberOfSlots) {
 		
 		synchronized (this.lock) {
 			if (this.isShutdown) {
@@ -163,11 +160,14 @@ public class InstanceManager {
 						" which was marked as dead earlier because of a heart-beat timeout.");
 			}
 
-			ActorGateway actorGateway = new AkkaActorGateway(taskManager, leaderSessionID);
-
 			InstanceID instanceID = new InstanceID();
 
-			Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);
+			Instance host = new Instance(
+				taskManagerGateway,
+				taskManagerLocation,
+				instanceID,
+				resources,
+				numberOfSlots);
 
 			registeredHostsById.put(instanceID, host);
 			registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
@@ -179,7 +179,7 @@ public class InstanceManager {
 								"Current number of registered hosts is %d. " +
 								"Current number of alive task slots is %d.",
 						taskManagerLocation.getHostname(),
-						taskManager.path(),
+						taskManagerGateway.getAddress(),
 						instanceID,
 						registeredHostsById.size(),
 						totalNumberOfAliveTaskSlots));
@@ -195,7 +195,7 @@ public class InstanceManager {
 	}
 
 	/**
-	 * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
+	 * Unregisters the TaskManager with the given instance id. Unregistering means to mark
 	 * the given instance as dead and notify {@link InstanceListener} about the dead instance.
 	 *
 	 * @param instanceId TaskManager which is about to be marked dead.
@@ -204,8 +204,6 @@ public class InstanceManager {
 		Instance instance = registeredHostsById.get(instanceId);
 
 		if (instance != null){
-			ActorRef host = instance.getActorGateway().actor();
-
 			registeredHostsById.remove(instance.getId());
 			registeredHostsByResource.remove(instance.getTaskManagerID());
 
@@ -219,9 +217,10 @@ public class InstanceManager {
 
 			notifyDeadInstance(instance);
 
-			LOG.info("Unregistered task manager " + host.path() + ". Number of " +
-					"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
-					" of available slots " + getTotalNumberOfSlots() + ".");
+			LOG.info(
+				"Unregistered task manager " + instance.getTaskManagerLocation().addressString() +
+				". Number of registered task managers " + getNumberOfRegisteredTaskManagers() +
+				". Number of available slots " + getTotalNumberOfSlots() + ".");
 		} else {
 			LOG.warn("Tried to unregister instance {} but it is not registered.", instanceId);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 97385b1..106a8ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
@@ -63,15 +64,15 @@ public class SharedSlot extends Slot {
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
-	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager   
+	 * @param taskManagerGateway The gateway to communicate with the TaskManager
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
 	public SharedSlot(
 			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
-			ActorGateway taskManagerActorGateway,
+			TaskManagerGateway taskManagerGateway,
 			SlotSharingGroupAssignment assignmentGroup) {
 
-		this(jobID, owner, location, slotNumber, taskManagerActorGateway, assignmentGroup, null, null);
+		this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
 	}
 
 	/**
@@ -82,18 +83,22 @@ public class SharedSlot extends Slot {
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
-	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager   
+	 * @param taskManagerGateway The gateway to communicate with the TaskManager
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 * @param parent The parent slot of this slot.
 	 * @param groupId The assignment group of this slot.
 	 */
 	public SharedSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
-			ActorGateway taskManagerActorGateway,
+			JobID jobID,
+			SlotOwner owner,
+			TaskManagerLocation location,
+			int slotNumber,
+			TaskManagerGateway taskManagerGateway,
 			SlotSharingGroupAssignment assignmentGroup,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupId) {
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupId) {
 
-		super(jobID, owner, location, slotNumber, taskManagerActorGateway, parent, groupId);
+		super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId);
 
 		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
@@ -218,7 +223,7 @@ public class SharedSlot extends Slot {
 		if (isAlive()) {
 			SimpleSlot slot = new SimpleSlot(
 					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerActorGateway(), this, groupId);
+					getTaskManagerGateway(), this, groupId);
 			subSlots.add(slot);
 			return slot;
 		}
@@ -240,7 +245,7 @@ public class SharedSlot extends Slot {
 		if (isAlive()) {
 			SharedSlot slot = new SharedSlot(
 					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerActorGateway(), assignmentGroup, this, groupId);
+					getTaskManagerGateway(), assignmentGroup, this, groupId);
 			subSlots.add(slot);
 			return slot;
 		}
@@ -290,4 +295,4 @@ public class SharedSlot extends Slot {
 	public String toString() {
 		return "Shared " + super.toString();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 479fa29..8c7ec01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 
@@ -61,12 +62,12 @@ public class SimpleSlot extends Slot {
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the task slot on the instance.
-	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager of this slot   
+	 * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
 	 */
 	public SimpleSlot(
 			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
-			ActorGateway taskManagerActorGateway) {
-		this(jobID, owner, location, slotNumber, taskManagerActorGateway, null, null);
+			TaskManagerGateway taskManagerGateway) {
+		this(jobID, owner, location, slotNumber, taskManagerGateway, null, null);
 	}
 
 	/**
@@ -77,18 +78,19 @@ public class SimpleSlot extends Slot {
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
+	 * @param taskManagerGateway to communicate with the associated task manager.
 	 * @param parent The parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	public SimpleSlot(
 			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
-			ActorGateway taskManagerActorGateway,
+			TaskManagerGateway taskManagerGateway,
 			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
 
 		super(parent != null ?
 				parent.getAllocatedSlot() :
 				new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber,
-						ResourceProfile.UNKNOWN, taskManagerActorGateway),
+						ResourceProfile.UNKNOWN, taskManagerGateway),
 				owner, slotNumber, parent, groupID);
 	}
 
@@ -237,4 +239,4 @@ public class SimpleSlot extends Slot {
 	public String toString() {
 		return "SimpleSlot " + super.toString();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index b840b0c..8f8b897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
@@ -95,20 +96,29 @@ public abstract class Slot {
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of this slot.
-	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager
+	 * @param taskManagerGateway The actor gateway to communicate with the TaskManager
 	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
 	 * @param groupID The ID that identifies the task group for which this slot is allocated. May be null
 	 *                if the slot does not belong to any task group.   
 	 */
 	protected Slot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
-			ActorGateway taskManagerActorGateway,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
+			JobID jobID,
+			SlotOwner owner,
+			TaskManagerLocation location,
+			int slotNumber,
+			TaskManagerGateway taskManagerGateway,
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
 
 		checkArgument(slotNumber >= 0);
 
 		this.allocatedSlot = new AllocatedSlot(
-				NO_ALLOCATION_ID, jobID, location, slotNumber, ResourceProfile.UNKNOWN, taskManagerActorGateway);
+			NO_ALLOCATION_ID,
+			jobID,
+			location,
+			slotNumber,
+			ResourceProfile.UNKNOWN,
+			taskManagerGateway);
 
 		this.owner = checkNotNull(owner);
 		this.parent = parent; // may be null
@@ -184,8 +194,8 @@ public abstract class Slot {
 	 *
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
-	public ActorGateway getTaskManagerActorGateway() {
-		return allocatedSlot.getTaskManagerActorGateway();
+	public TaskManagerGateway getTaskManagerGateway() {
+		return allocatedSlot.getTaskManagerGateway();
 	}
 
 	/**
@@ -361,4 +371,4 @@ public abstract class Slot {
 				return "(unknown)";
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
new file mode 100644
index 0000000..fe4ecfb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
+import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * Implementation of the {@link TaskManagerGateway} for {@link ActorGateway}.
+ */
+public class ActorTaskManagerGateway implements TaskManagerGateway {
+	private final ActorGateway actorGateway;
+
+	public ActorTaskManagerGateway(ActorGateway actorGateway) {
+		this.actorGateway = Preconditions.checkNotNull(actorGateway);
+	}
+
+	public ActorGateway getActorGateway() {
+		return actorGateway;
+	}
+
+	//-------------------------------------------------------------------------------
+	// Task manager rpc methods
+	//-------------------------------------------------------------------------------
+
+	@Override
+	public String getAddress() {
+		return actorGateway.path();
+	}
+
+	@Override
+	public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
+		actorGateway.tell(new Messages.Disconnect(instanceId, cause));
+	}
+
+	@Override
+	public void stopCluster(final ApplicationStatus applicationStatus, final String message) {
+		actorGateway.tell(new StopCluster(applicationStatus, message));
+	}
+
+	@Override
+	public Future<StackTrace> requestStackTrace(final Time timeout) {
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<StackTrace> stackTraceFuture = actorGateway.ask(
+			TaskManagerMessages.SendStackTrace$.MODULE$.get(),
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class));
+
+		return new FlinkFuture<>(stackTraceFuture);
+	}
+
+	@Override
+	public Future<StackTraceSampleResponse> requestStackTraceSample(
+			ExecutionAttemptID executionAttemptID,
+			int sampleId,
+			int numSamples,
+			Time delayBetweenSamples,
+			int maxStackTraceDepth,
+			Time timeout) {
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkArgument(numSamples > 0, "The number of samples must be greater than 0.");
+		Preconditions.checkNotNull(delayBetweenSamples);
+		Preconditions.checkArgument(maxStackTraceDepth >= 0, "The max stack trace depth must be greater or equal than 0.");
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<StackTraceSampleResponse> stackTraceSampleResponseFuture = actorGateway.ask(
+			new StackTraceSampleMessages.TriggerStackTraceSample(
+				sampleId,
+				executionAttemptID,
+				numSamples,
+				delayBetweenSamples,
+				maxStackTraceDepth),
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class));
+
+		return new FlinkFuture<>(stackTraceSampleResponseFuture);
+	}
+
+	@Override
+	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+		Preconditions.checkNotNull(tdd);
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<Acknowledge> submitResult = actorGateway.ask(
+			new TaskMessages.SubmitTask(tdd),
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+		return new FlinkFuture<>(submitResult);
+	}
+
+	@Override
+	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<Acknowledge> stopResult = actorGateway.ask(
+			new TaskMessages.StopTask(executionAttemptID),
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+		return new FlinkFuture<>(stopResult);
+	}
+
+	@Override
+	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<Acknowledge> cancelResult = actorGateway.ask(
+			new TaskMessages.CancelTask(executionAttemptID),
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+		return new FlinkFuture<>(cancelResult);
+	}
+
+	@Override
+	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkNotNull(partitionInfos);
+
+		TaskMessages.UpdatePartitionInfo updatePartitionInfoMessage = new TaskMessages.UpdateTaskMultiplePartitionInfos(
+			executionAttemptID,
+			partitionInfos);
+
+		scala.concurrent.Future<Acknowledge> updatePartitionsResult = actorGateway.ask(
+			updatePartitionInfoMessage,
+			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+
+		return new FlinkFuture<>(updatePartitionsResult);
+	}
+
+	@Override
+	public void failPartition(ExecutionAttemptID executionAttemptID) {
+		Preconditions.checkNotNull(executionAttemptID);
+
+		actorGateway.tell(new TaskMessages.FailIntermediateResultPartitions(executionAttemptID));
+	}
+
+	@Override
+	public void notifyCheckpointComplete(
+			ExecutionAttemptID executionAttemptID,
+			JobID jobId,
+			long checkpointId,
+			long timestamp) {
+
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkNotNull(jobId);
+
+		actorGateway.tell(new NotifyCheckpointComplete(jobId, executionAttemptID, checkpointId, timestamp));
+	}
+
+	@Override
+	public void triggerCheckpoint(
+			ExecutionAttemptID executionAttemptID,
+			JobID jobId,
+			long checkpointId,
+			long timestamp) {
+
+		Preconditions.checkNotNull(executionAttemptID);
+		Preconditions.checkNotNull(jobId);
+
+		actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+	}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerLog(), timeout);
+	}
+
+	@Override
+	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
+	}
+
+	private Future<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
+		Preconditions.checkNotNull(request);
+		Preconditions.checkNotNull(timeout);
+
+		scala.concurrent.Future<BlobKey> blobKeyFuture = actorGateway
+			.ask(
+				request,
+				new FiniteDuration(timeout.getSize(), timeout.getUnit()))
+			.mapTo(ClassTag$.MODULE$.<BlobKey>apply(BlobKey.class));
+
+		return new FlinkFuture<>(blobKeyFuture);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 355524c..9419ab4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,7 +52,7 @@ public class AllocatedSlot {
 	private final ResourceProfile resourceProfile;
 
 	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
-	private final ActorGateway taskManagerActorGateway;
+	private final TaskManagerGateway taskManagerGateway;
 
 	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
 	private final int slotNumber;
@@ -66,14 +65,14 @@ public class AllocatedSlot {
 			TaskManagerLocation location,
 			int slotNumber,
 			ResourceProfile resourceProfile,
-			ActorGateway actorGateway)
+			TaskManagerGateway taskManagerGateway)
 	{
 		this.slotAllocationId = checkNotNull(slotAllocationId);
 		this.jobID = checkNotNull(jobID);
 		this.taskManagerLocation = checkNotNull(location);
 		this.slotNumber = slotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerActorGateway = checkNotNull(actorGateway);
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
 	}
 
 	public AllocatedSlot(AllocatedSlot other) {
@@ -82,7 +81,7 @@ public class AllocatedSlot {
 		this.taskManagerLocation = other.taskManagerLocation;
 		this.slotNumber = other.slotNumber;
 		this.resourceProfile = other.resourceProfile;
-		this.taskManagerActorGateway = other.taskManagerActorGateway;
+		this.taskManagerGateway = other.taskManagerGateway;
 	}
 
 	// ------------------------------------------------------------------------
@@ -139,8 +138,8 @@ public class AllocatedSlot {
 	 *
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
-	public ActorGateway getTaskManagerActorGateway() {
-		return taskManagerActorGateway;
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
new file mode 100644
index 0000000..db0a3bf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+
+/**
+ * Task manager gateway interface to communicate with the task manager.
+ */
+public interface TaskManagerGateway {
+
+	/**
+	 * Return the address of the task manager with which the gateway is associated.
+	 *
+	 * @return Address of the task manager with which this gateway is associated.
+	 */
+	String getAddress();
+
+	/**
+	 * Disconnect the task manager from the job manager.
+	 *
+	 * @param instanceId identifying the task manager
+	 * @param cause of the disconnection
+	 */
+	void disconnectFromJobManager(InstanceID instanceId, Exception cause);
+
+	/**
+	 * Stop the cluster.
+	 *
+	 * @param applicationStatus to stop the cluster with
+	 * @param message to deliver
+	 */
+	void stopCluster(final ApplicationStatus applicationStatus, final String message);
+
+	/**
+	 * Request the stack trace from the task manager.
+	 *
+	 * @param timeout for the stack trace request
+	 * @return Future for a stack trace
+	 */
+	Future<StackTrace> requestStackTrace(final Time timeout);
+
+	/**
+	 * Request a stack trace sample from the given task.
+	 *
+	 * @param executionAttemptID identifying the task to sample
+	 * @param sampleId of the sample
+	 * @param numSamples to take from the given task
+	 * @param delayBetweenSamples to wait for
+	 * @param maxStackTraceDepth of the returned sample
+	 * @param timeout of the request
+	 * @return Future of stack trace sample response
+	 */
+	Future<StackTraceSampleResponse> requestStackTraceSample(
+		final ExecutionAttemptID executionAttemptID,
+		final int sampleId,
+		final int numSamples,
+		final Time delayBetweenSamples,
+		final int maxStackTraceDepth,
+		final Time timeout);
+
+	/**
+	 * Submit a task to the task manager.
+	 *
+	 * @param tdd describing the task to submit
+	 * @param timeout of the submit operation
+	 * @return Future acknowledge of the successful operation
+	 */
+	Future<Acknowledge> submitTask(
+		TaskDeploymentDescriptor tdd,
+		Time timeout);
+
+	/**
+	 * Stop the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param timeout of the submit operation
+	 * @return Future acknowledge if the task is successfully stopped
+	 */
+	Future<Acknowledge> stopTask(
+		ExecutionAttemptID executionAttemptID,
+		Time timeout);
+
+	/**
+	 * Cancel the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param timeout of the submit operation
+	 * @return Future acknowledge if the task is successfully canceled
+	 */
+	Future<Acknowledge> cancelTask(
+		ExecutionAttemptID executionAttemptID,
+		Time timeout);
+
+	/**
+	 * Update the task where the given partitions can be found.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param partitionInfos telling where the partition can be retrieved from
+	 * @param timeout of the submit operation
+	 * @return Future acknowledge if the partitions have been successfully updated
+	 */
+	Future<Acknowledge> updatePartitions(
+		ExecutionAttemptID executionAttemptID,
+		Iterable<PartitionInfo> partitionInfos,
+		Time timeout);
+
+	/**
+	 * Fail all intermediate result partitions of the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 */
+	void failPartition(ExecutionAttemptID executionAttemptID);
+
+	/**
+	 * Notify the given task about a completed checkpoint.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param jobId identifying the job to which the task belongs
+	 * @param checkpointId of the completed checkpoint
+	 * @param timestamp of the completed checkpoint
+	 */
+	void notifyCheckpointComplete(
+		ExecutionAttemptID executionAttemptID,
+		JobID jobId,
+		long checkpointId,
+		long timestamp);
+
+	/**
+	 * Trigger for the given task a checkpoint.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param jobId identifying the job to which the task belongs
+	 * @param checkpointId of the checkpoint to trigger
+	 * @param timestamp of the checkpoint to trigger
+	 */
+	void triggerCheckpoint(
+		ExecutionAttemptID executionAttemptID,
+		JobID jobId,
+		long checkpointId,
+		long timestamp);
+
+	/**
+	 * Request the task manager log from the task manager.
+	 *
+	 * @param timeout for the request
+	 * @return Future blob key under which the task manager log has been stored
+	 */
+	Future<BlobKey> requestTaskManagerLog(final Time timeout);
+
+	/**
+	 * Request the task manager stdout from the task manager.
+	 *
+	 * @param timeout for the request
+	 * @return Future blob key under which the task manager stdout file has been stored
+	 */
+	Future<BlobKey> requestTaskManagerStdout(final Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
index 4df4a16..4bbc50a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.messages;
 
+import java.io.Serializable;
+
 /**
  * A generic acknowledgement message.
  */
-public class Acknowledge implements RequiresLeaderSessionID, java.io.Serializable {
+public class Acknowledge implements Serializable {
 
 	private static final long serialVersionUID = 7808628311617273755L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
new file mode 100644
index 0000000..c041655
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTrace.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+public class StackTrace implements Serializable {
+
+	private static final long serialVersionUID = -899464298250067416L;
+
+	private final InstanceID instanceId;
+	private final String stackTrace;
+
+	public StackTrace(InstanceID instanceId, String stackTrace) {
+		this.instanceId = Preconditions.checkNotNull(instanceId);
+		this.stackTrace = Preconditions.checkNotNull(stackTrace);
+	}
+
+	public InstanceID getInstanceId() {
+		return instanceId;
+	}
+
+	public String getStackTrace() {
+		return stackTrace;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
new file mode 100644
index 0000000..52dbe77
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/StackTraceSampleResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Response to the TriggerStackTraceSample message.
+ */
+public class StackTraceSampleResponse implements Serializable {
+
+	private static final long serialVersionUID = -4786454630050578031L;
+
+	private final int sampleId;
+
+	private final ExecutionAttemptID executionAttemptID;
+
+	private final List<StackTraceElement[]> samples;
+
+	public StackTraceSampleResponse(
+			int sampleId,
+			ExecutionAttemptID executionAttemptID,
+			List<StackTraceElement[]> samples) {
+		this.sampleId = sampleId;
+		this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
+		this.samples = Preconditions.checkNotNull(samples);
+	}
+
+	public int getSampleId() {
+		return sampleId;
+	}
+
+	public ExecutionAttemptID getExecutionAttemptID() {
+		return executionAttemptID;
+	}
+
+	public List<StackTraceElement[]> getSamples() {
+		return samples;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfb3790f/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 80bdb73..27100fd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.akka
 
 import java.io.IOException
 import java.net._
-import java.util.concurrent.{TimeUnit, Callable}
+import java.util.concurrent.{Callable, TimeUnit}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
-import com.typesafe.config.{ConfigValueFactory, ConfigParseOptions, Config, ConfigFactory}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
-import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
+import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
 import org.slf4j.LoggerFactory
+
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -545,10 +547,16 @@ object AkkaUtils {
     new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
   }
 
-  def getDefaultTimeout: FiniteDuration = {
+  def getDefaultTimeout: Time = {
     val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)
 
-    new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS)
+    Time.milliseconds(duration.toMillis)
+  }
+
+  def getDefaultTimeoutAsFiniteDuration: FiniteDuration = {
+    val timeout = getDefaultTimeout
+
+    new FiniteDuration(timeout.toMilliseconds, TimeUnit.MILLISECONDS)
   }
 
   def getLookupTimeout(config: Configuration): FiniteDuration = {