You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/04/06 17:49:37 UTC

[4/7] flink git commit: [FLINK-1580] [taskmanager] Improve TaskManager startup robustness

    [FLINK-1580] [taskmanager] Improve TaskManager startup robustness

     - Initialize network buffer pool and memory manager before the
       asynchronous actor start

     - Split messages into more topic-related groups

     - Split message handling logic on TaskManager into topic-related functions

     - Simplify registration logic

     - Add a lot of tests that validate
       - task manager configuration checking
       - task manager registration
       - task manager re-registration on failures/disconnects


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e74521c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e74521c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e74521c1

Branch: refs/heads/master
Commit: e74521c1fe5f5842185be5c454d27adc3bd254d5
Parents: 1da4b64
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Apr 6 14:49:09 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 6 15:30:44 2015 +0200

----------------------------------------------------------------------
 .../IllegalConfigurationException.java          |   35 +-
 .../flink/runtime/execution/ExecutionState.java |   29 +-
 .../flink/runtime/executiongraph/Execution.java |   21 +-
 .../instance/InstanceConnectionInfo.java        |    3 +-
 .../org/apache/flink/runtime/instance/Slot.java |    2 +-
 .../runtime/io/network/NetworkEnvironment.java  |    6 +-
 .../memorymanager/DefaultMemoryManager.java     |    6 +
 .../runtime/memorymanager/MemoryManager.java    |    9 +-
 .../apache/flink/runtime/taskmanager/Task.java  |    8 +-
 .../taskmanager/TaskInputSplitProvider.java     |   16 +-
 .../apache/flink/runtime/ActorLogMessages.scala |    4 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   11 +
 .../flink/runtime/jobmanager/JobManager.scala   |  119 +-
 .../StreamCheckpointCoordinator.scala           |   51 +-
 .../messages/CheckpointingMessages.scala        |   52 +
 .../messages/ExecutionGraphMessages.scala       |    2 +-
 .../runtime/messages/JobManagerMessages.scala   |   17 +-
 .../flink/runtime/messages/Messages.scala       |   25 +-
 .../runtime/messages/RegistrationMessages.scala |   59 +-
 .../runtime/messages/TaskManagerMessages.scala  |  177 +-
 .../flink/runtime/messages/TaskMessages.scala   |  171 ++
 .../minicluster/LocalFlinkMiniCluster.scala     |   14 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 1768 +++++++++++-------
 .../taskmanager/TaskManagerConfiguration.scala  |   11 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   24 +-
 .../ExecutionVertexCancelTest.java              |   10 +-
 .../ExecutionVertexDeploymentTest.java          |    2 +-
 .../runtime/taskmanager/RegistrationTest.java   |  379 ++++
 ...askManagerComponentsStartupShutdownTest.java |  141 ++
 .../TaskManagerConfigurationTest.java           |  140 ++
 .../TaskManagerProcessReapingTest.java          |    2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  447 +++--
 .../flink/runtime/taskmanager/TaskTest.java     |    1 -
 .../jobmanager/JobManagerConnectionTest.scala   |   14 +-
 .../TaskManagerRegistrationITCase.scala         |  132 --
 .../TaskManagerRegistrationTest.scala           |  139 ++
 .../runtime/testingUtils/TestingCluster.scala   |   14 +-
 .../testingUtils/TestingTaskManager.scala       |   33 +-
 .../runtime/testingUtils/TestingUtils.scala     |   37 +-
 .../api/streamvertex/StreamVertex.java          |    7 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   10 +-
 .../AbstractProcessFailureRecoveryTest.java     |    6 +-
 .../yarn/appMaster/YarnTaskManagerRunner.java   |    2 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   17 +-
 44 files changed, 2842 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
index 1e71fc7..e6a2022 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/IllegalConfigurationException.java
@@ -16,42 +16,35 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 /**
- * An <code>IllegalConfigurationException</code> is thrown when the user
- * has configured job vertices in a way that either conflicts
- * with the expected usage of the respective task of the configuration
- * of the Nephele framework.
+ * An {@code IllegalConfigurationException} is thrown when
+ * the values in a given {@link Configuration} are not valid. This may refer
+ * to the Flink configuration with which the framework is started,
+ * or a Configuration passed internally between components.
  */
 public class IllegalConfigurationException extends RuntimeException {
 
-	/**
-	 * Generated serial UID.
-	 */
 	private static final long serialVersionUID = 695506964810499989L;
 
 	/**
-	 * Constructs an new illegal configuration exception with the given error message.
+	 * Constructs an new IllegalConfigurationException with the given error message.
 	 * 
-	 * @param errorMsg
-	 *        the error message to be included in the exception
+	 * @param message The error message for the exception.
 	 */
-	public IllegalConfigurationException(final String errorMsg) {
-		super(errorMsg);
+	public IllegalConfigurationException(String message) {
+		super(message);
 	}
 
 	/**
-	 * Constructs an new illegal configuration exception with the given error message
+	 * Constructs an new IllegalConfigurationException with the given error message
 	 * and a given cause.
-	 * 
-	 * @param errorMsg
-	 *        The error message to be included in the exception.
-	 * @param cause
-	 *        The exception that caused this exception.
+	 *
+	 * @param message The error message for the exception.
+	 * @param cause The exception that caused this exception.
 	 */
-	public IllegalConfigurationException(final String errorMsg, final Throwable cause) {
-		super(errorMsg, cause);
+	public IllegalConfigurationException(String message, Throwable cause) {
+		super(message, cause);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index e5235e3..2fcaea1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -18,6 +18,28 @@
 
 package org.apache.flink.runtime.execution;
 
+/**
+ * An enumeration of all states that a task can be in during its execution.
+ * Tasks usually start in the state {@code CREATED} and switch states according to
+ * this diagram:
+ * <pre>
+ *
+ *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
+ *                     |            |          |
+ *                     |            |   +------+
+ *                     |            V   V
+ *                     |         CANCELLING -----+----> CANCELED
+ *                     |                         |
+ *                     +-------------------------+
+ *
+ *                                               ... -> FAILED
+ * </pre>
+ *
+ * It is possible to enter the {@code FAILED} state from any other state.
+ *
+ * The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
+ * considered terminal states.
+ */
 public enum ExecutionState {
 
 	CREATED,
@@ -34,5 +56,10 @@ public enum ExecutionState {
 	
 	CANCELED,
 	
-	FAILED
+	FAILED;
+
+
+	public boolean isTerminal() {
+		return this == FINISHED || this == CANCELED || this == FAILED;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a9ee105..3ba378c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -68,12 +68,13 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailIntermediateResultPartitions;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTask;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTaskSinglePartitionInfo;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.createUpdateTaskMultiplePartitionInfos;
+
+import static org.apache.flink.runtime.messages.TaskMessages.CancelTask;
+import static org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import static org.apache.flink.runtime.messages.TaskMessages.UpdatePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskMessages.UpdateTaskSinglePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskMessages.createUpdateTaskMultiplePartitionInfos;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -526,7 +527,7 @@ public class Execution implements Serializable {
 					final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
 							partitionId, partitionLocation);
 
-					final UpdateTask updateTaskMessage = new UpdateTaskSinglePartitionInfo(
+					final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
 							consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
 
 					sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
@@ -685,7 +686,7 @@ public class Execution implements Serializable {
 				inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
 			}
 
-			UpdateTask updateTaskMessage =
+			UpdatePartitionInfo updateTaskMessage =
 					createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
 							inputChannelDeploymentDescriptors);
 
@@ -845,7 +846,7 @@ public class Execution implements Serializable {
 	}
 
 	private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
-									final UpdateTask updateTaskMsg) {
+										final UpdatePartitionInfo updateTaskMsg) {
 
 		if (consumerSlot != null) {
 			final Instance instance = consumerSlot.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index ee79c23..eb87292 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -101,7 +101,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 			// take IP textual representation
 			this.hostName = this.fqdnHostName;
 			LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
-					+ "Local input split assignment (such as for HDFS files) may be impacted.");
+					+ "Local input split assignment (such as for HDFS files) may be impacted.",
+					this.inetAddress.getHostAddress());
 		}
 		else {
 			this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index bf8464c..082fbf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -184,7 +184,7 @@ public abstract class Slot {
 		return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : "");
 	}
 
-	private static final String getStateName(int state) {
+	private static String getStateName(int state) {
 		switch (state) {
 			case ALLOCATED_AND_ALIVE:
 				return "ALLOCATED/ALIVE";

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 55b89b4..dbf1586 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -49,7 +49,7 @@ import java.io.IOException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskManagerMessages.FailTask;
+import static org.apache.flink.runtime.messages.TaskMessages.FailTask;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The network environment contains
@@ -165,6 +165,7 @@ public class NetworkEnvironment {
 			{
 				// good, not currently associated. start the individual components
 
+				LOG.debug("Starting result partition manager and network connection manager");
 				this.partitionManager = new ResultPartitionManager();
 				this.taskEventDispatcher = new TaskEventDispatcher();
 				this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier(
@@ -176,6 +177,7 @@ public class NetworkEnvironment {
 															: new LocalConnectionManager();
 
 				try {
+					LOG.debug("Starting network connection manager");
 					connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
 				}
 				catch (Throwable t) {
@@ -312,7 +314,7 @@ public class NetworkEnvironment {
 	}
 
 	public void unregisterTask(Task task) {
-		LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
+		LOG.debug("Unregister task {} from network environment (state: {}).",
 				task.getTaskNameWithSubtasks(), task.getExecutionState());
 
 		final ExecutionAttemptID executionId = task.getExecutionId();

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index cd677ac..28ebe13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -165,6 +165,12 @@ public class DefaultMemoryManager implements MemoryManager {
 		// -------------------- END CRITICAL SECTION -------------------
 	}
 
+	@Override
+	public boolean isShutdown() {
+		return this.isShutDown;
+	}
+
+	@Override
 	public boolean verifyEmpty() {
 		synchronized (this.lock) {
 			return this.freeSegments.size() == this.totalNumPages;

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
index 1ab6931..631f0b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
@@ -117,7 +117,14 @@ public interface MemoryManager {
 	 * code that allocated them from the memory manager.
 	 */
 	void shutdown();
-	
+
+	/**
+	 * Checks whether the MemoryManager has been shut down.
+	 *
+	 * @return True, if the memory manager is shut down, false otherwise.
+	 */
+	boolean isShutdown();
+
 	/**
 	 * Checks if the memory manager all memory available.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 3d1419a..f6eb907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.UnregisterTask;
 import org.apache.flink.runtime.profiling.TaskManagerProfiler;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -304,7 +304,7 @@ public class Task {
 				}
 			}
 			else {
-				throw new RuntimeException("unexpected state for cancelling: " + current);
+				throw new RuntimeException("unexpected state for failing the task: " + current);
 			}
 		}
 	}
@@ -361,7 +361,7 @@ public class Task {
 											Throwable optionalError) {
 		LOG.info("Update execution state of {} ({}) to {}.", this.getTaskName(),
 				this.getExecutionId(), executionState);
-		taskManager.tell(new JobManagerMessages.UpdateTaskExecutionState(
+		taskManager.tell(new TaskMessages.UpdateTaskExecutionState(
 				new TaskExecutionState(jobId, executionId, executionState, optionalError)),
 				ActorRef.noSender());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 6e446de..1bdc346 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -28,12 +28,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.util.InstantiationUtil;
 
 import scala.concurrent.Await;
 import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
 
@@ -47,11 +45,11 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 
 	private final ClassLoader usercodeClassLoader;
 	
-	private final FiniteDuration timeout;
+	private final Timeout timeout;
 	
 	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
 								ExecutionAttemptID executionID, ClassLoader userCodeClassLoader,
-								FiniteDuration timeout)
+								Timeout timeout)
 	{
 		this.jobManager = jobManager;
 		this.jobId = jobId;
@@ -66,20 +64,20 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 		try {
 			final Future<Object> response = Patterns.ask(jobManager,
 					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
-					new Timeout(timeout));
+					timeout);
 
-			final Object result = Await.result(response, timeout);
+			final Object result = Await.result(response, timeout.duration());
 
 			if (result == null) {
 				return null;
 			}
 
-			if(!(result instanceof TaskManagerMessages.NextInputSplit)){
+			if(!(result instanceof JobManagerMessages.NextInputSplit)){
 				throw new RuntimeException("RequestNextInputSplit requires a response of type " +
 						"NextInputSplit. Instead response is of type " + result.getClass() + ".");
 			} else {
-				final TaskManagerMessages.NextInputSplit nextInputSplit =
-						(TaskManagerMessages.NextInputSplit) result;
+				final JobManagerMessages.NextInputSplit nextInputSplit =
+						(JobManagerMessages.NextInputSplit) result;
 
 				byte[] serializedData = nextInputSplit.splitData();
 				Object deserialized = InstantiationUtil.deserializeObject(serializedData,

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
index 5d4f89c..acd4346 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/ActorLogMessages.scala
@@ -37,14 +37,14 @@ trait ActorLogMessages {
         _receiveWithLogMessages(x)
       }
       else {
-        log.debug(s"Received message $x at ${that.self.path} from ${that.sender}.")
+        log.debug(s"Received message $x at ${that.self.path} from ${that.sender()}.")
 
         val start = System.nanoTime()
 
         _receiveWithLogMessages(x)
 
         val duration = (System.nanoTime() - start) / 1000000
-        log.debug(s"Handled message $x in $duration ms from ${that.sender}.")
+        log.debug(s"Handled message $x in $duration ms from ${that.sender()}.")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index e898f44..5b33017 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -44,6 +44,17 @@ object AkkaUtils {
   var globalExecutionContext: ExecutionContext = ExecutionContext.global
 
   /**
+   * Creates a local actor system without remoting.
+   *
+   * @param configuration instance containing the user provided configuration values
+   * @return The created actor system
+   */
+  def createLocalActorSystem(configuration: Configuration): ActorSystem = {
+    val akkaConfig = getAkkaConfig(configuration, None)
+    createActorSystem(akkaConfig)
+  }
+
+  /**
    * Creates an actor system. If a listening address is specified, then the actor system will listen
    * on that address for messages from a remote actor system. If not, then a local actor system
    * will be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4b0a55b..49bb1d5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -30,8 +30,10 @@ import org.apache.flink.runtime.client.{JobStatusMessage, JobSubmissionException
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
+import org.apache.flink.runtime.messages.CheckpointingMessages.{StateBarrierAck, BarrierAck}
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -46,9 +48,9 @@ import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, NextInputSplit, Heartbeat}
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
 import org.apache.flink.runtime.profiling.ProfilingUtils
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
 import org.slf4j.LoggerFactory
 
@@ -76,7 +78,7 @@ import scala.collection.JavaConverters._
  *  is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]]
  *
  * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an
- * [[org.apache.flink.runtime.executiongraph.ExecutionVertex]] contained in the [[ExecutionGraph]].
+     ExecutionVertex contained in the [[ExecutionGraph]].
  * A successful update is acknowledged by true and otherwise false.
  *
  * - [[RequestNextInputSplit]] requests the next input split for a running task on a
@@ -103,17 +105,17 @@ class JobManager(val flinkConfiguration: Configuration,
 
   /** List of current jobs running jobs */
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
-  
+
 
   /**
    * Run when the job manager is started. Simply logs an informational message.
    */
   override def preStart(): Unit = {
-    LOG.info(s"Starting JobManager at ${self.path}.")
+    LOG.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
   }
 
   override def postStop(): Unit = {
-    log.info(s"Stopping job manager ${self.path}.")
+    log.info(s"Stopping JobManager ${self.path.toSerializationFormat}.")
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
@@ -148,29 +150,38 @@ class JobManager(val flinkConfiguration: Configuration,
    */
   override def receiveWithLogMessages: Receive = {
 
-    case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) =>
-      val taskManager = sender
+    case RegisterTaskManager(taskManager, connectionInfo, hardwareInformation, numberOfSlots) =>
 
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
-        taskManager ! AlreadyRegistered(instanceID, libraryCacheManager.getBlobServerPort, profiler)
-      } else {
 
-        val instanceID = try {
-           instanceManager.registerTaskManager(taskManager, connectionInfo,
+        // IMPORTANT: Send the response to the "sender", which is not the
+        //            TaskManager actor, but the ask future!
+        sender() ! AlreadyRegistered(self, instanceID, libraryCacheManager.getBlobServerPort)
+      }
+      else {
+        try {
+          val instanceID = instanceManager.registerTaskManager(taskManager, connectionInfo,
             hardwareInformation, numberOfSlots)
-        } catch {
+
+          // IMPORTANT: Send the response to the "sender", which is not the
+          //            TaskManager actor, but the ask future!
+          sender() ! AcknowledgeRegistration(self, instanceID,
+                                             libraryCacheManager.getBlobServerPort)
+
+          // to be notified when the taskManager is no longer reachable
+          context.watch(taskManager)
+        }
+        catch {
           // registerTaskManager throws an IllegalStateException if it is already shut down
           // let the actor crash and restart itself in this case
-          case ex: Exception => throw new RuntimeException(s"Could not register the task manager " +
-            s"${taskManager.path} at the instance manager.", ex)
-        }
-
-        // to be notified when the taskManager is no longer reachable
-        context.watch(taskManager)
+          case e: Exception =>
+            log.error(e, "Failed to register TaskManager at instance manager")
 
-        taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort,
-          profiler)
+            // IMPORTANT: Send the response to the "sender", which is not the
+            //            TaskManager actor, but the ask future!
+            sender() ! RefuseRegistration(ExceptionUtils.stringifyException(e))
+        }
       }
 
     case RequestNumberRegisteredTaskManager =>
@@ -205,7 +216,7 @@ class JobManager(val flinkConfiguration: Configuration,
       } else {
         currentJobs.get(taskExecutionState.getJobID) match {
           case Some((executionGraph, _)) =>
-            val originalSender = sender
+            val originalSender = sender()
 
             Future {
               val result = executionGraph.updateState(taskExecutionState)
@@ -302,7 +313,7 @@ class JobManager(val flinkConfiguration: Configuration,
             }
 
             removeJob(jobID)
-            
+
           }
         case None =>
           removeJob(jobID)
@@ -320,7 +331,7 @@ class JobManager(val flinkConfiguration: Configuration,
           jobExecution._1.getStateCheckpointerActor forward  msg
         case None =>
       }
-      
+
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
@@ -343,7 +354,7 @@ class JobManager(val flinkConfiguration: Configuration,
       } catch {
         case t: Throwable =>
           log.error(t, "Could not process accumulator event of job {} received from {}.",
-            accumulatorEvent.getJobID, sender.path)
+            accumulatorEvent.getJobID, sender().path)
       }
 
     case RequestAccumulatorResults(jobID) =>
@@ -396,9 +407,10 @@ class JobManager(val flinkConfiguration: Configuration,
 
     case Heartbeat(instanceID, metricsReport) =>
       try {
+        log.debug("Received hearbeat message from {}", instanceID)
         instanceManager.reportHeartBeat(instanceID, metricsReport)
       } catch {
-        case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender.path)
+        case t: Throwable => log.error(t, "Could not report heart beat from {}.", sender().path)
       }
 
     case RequestStackTrace(instanceID) =>
@@ -414,10 +426,10 @@ class JobManager(val flinkConfiguration: Configuration,
       }
 
     case RequestJobManagerStatus =>
-      sender ! JobManagerStatusAlive
+      sender() ! JobManagerStatusAlive
 
     case Disconnect(msg) =>
-      val taskManager = sender
+      val taskManager = sender()
 
       if (instanceManager.isRegistered(taskManager)) {
         log.info("Task manager {} wants to disconnect, because {}.", taskManager.path, msg)
@@ -474,7 +486,7 @@ class JobManager(val flinkConfiguration: Configuration,
         executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
           (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
             jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader),
-            JobInfo(sender, System.currentTimeMillis())))._1
+            JobInfo(sender(), System.currentTimeMillis())))._1
 
         // configure the execution graph
         val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
@@ -486,7 +498,7 @@ class JobManager(val flinkConfiguration: Configuration,
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
-        
+
         executionGraph.setCheckpointingEnabled(jobGraph.isCheckpointingEnabled)
         executionGraph.setCheckpointingInterval(jobGraph.getCheckpointingInterval)
 
@@ -532,15 +544,15 @@ class JobManager(val flinkConfiguration: Configuration,
         }
 
         // give an actorContext
-        executionGraph.setParentContext(context);
-        
+        executionGraph.setParentContext(context)
+
         // get notified about job status changes
         executionGraph.registerJobStatusListener(self)
 
         if (listenToEvents) {
           // the sender wants to be notified about state changes
-          executionGraph.registerExecutionListener(sender)
-          executionGraph.registerJobStatusListener(sender)
+          executionGraph.registerExecutionListener(sender())
+          executionGraph.registerJobStatusListener(sender())
         }
 
         // done with submitting the job
@@ -769,9 +781,12 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
 
-        val taskManagerActor = TaskManager.startTaskManagerActor(
-          configuration, jobManagerSystem, listeningAddress,
-          TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+        val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
+                        configuration, jobManagerSystem,
+                        listeningAddress,
+                        Some(TaskManager.TASK_MANAGER_NAME),
+                        Some(jobManager.path.toString),
+                        true, classOf[TaskManager])
 
         LOG.debug("Starting TaskManager process reaper")
         jobManagerSystem.actorOf(
@@ -965,6 +980,25 @@ object JobManager {
   def startJobManagerActors(configuration: Configuration,
                             actorSystem: ActorSystem): (ActorRef, ActorRef) = {
 
+    startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
+  }
+  /**
+   * Starts the JobManager and job archiver based on the given configuration, in the
+   * given actor system.
+   *
+   * @param configuration The configuration for the JobManager
+   * @param actorSystem The actor system running the JobManager
+   * @param jobMangerActorName Optionally the name of the JobManager actor. If none is given,
+   *                          the actor will have the name generated by the actor system.
+   * @param archiverActorName Optionally the name of the archive actor. If none is given,
+   *                          the actor will have the name generated by the actor system.
+   * @return A tuple of references (JobManager Ref, Archiver Ref)
+   */
+  def startJobManagerActors(configuration: Configuration,
+                            actorSystem: ActorSystem,
+                            jobMangerActorName: Option[String],
+                            archiverActorName: Option[String]): (ActorRef, ActorRef) = {
+
     val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
       profilerProps, executionRetries, delayBetweenRetries,
       timeout, _) = createJobManagerComponents(configuration)
@@ -972,13 +1006,20 @@ object JobManager {
     val profiler: Option[ActorRef] =
                  profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) )
 
-    val archiver: ActorRef = actorSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
+    // start the archiver wither with the given name, or without (avoid name conflicts)
+    val archiver: ActorRef = archiverActorName match {
+      case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
+      case None => actorSystem.actorOf(archiveProps)
+    }
 
     val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
         libraryCacheManager, archiver, accumulatorManager, profiler, executionRetries,
         delayBetweenRetries, timeout)
 
-    val jobManager = startActor(jobManagerProps, actorSystem)
+    val jobManager: ActorRef = jobMangerActorName match {
+      case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
+      case None => actorSystem.actorOf(jobManagerProps)
+    }
 
     (jobManager, archiver)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index f42d08ab..48266e2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import java.lang.Long
+import java.lang.{Long => JLong}
 
 import akka.actor._
-import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex}
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
 import org.apache.flink.runtime.jobgraph.JobVertexID
+import org.apache.flink.runtime.messages.CheckpointingMessages._
 import org.apache.flink.runtime.state.StateHandle
 
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.{FiniteDuration, _}
 
 /**
@@ -63,15 +63,18 @@ import scala.concurrent.duration.{FiniteDuration, _}
  */
 
 class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
-                         val vertices: Iterable[ExecutionVertex],
-                         var acks: Map[(JobVertexID,Int),List[Long]],
-                         var states: Map[(JobVertexID, Integer, Long), 
-                                 StateHandle],
-                         val interval: FiniteDuration,var curId: Long,var ackId: Long)
-        extends Actor with ActorLogMessages with ActorLogging {
-  
+                                  val vertices: Iterable[ExecutionVertex],
+                                  var acks: Map[(JobVertexID,Int),List[JLong]],
+                                  var states: Map[(JobVertexID, Integer, JLong), StateHandle],
+                                  val interval: FiniteDuration,
+                                  var curId: JLong,
+                                  var ackId: JLong)
+extends Actor with ActorLogMessages with ActorLogging {
+
+  implicit private val executor = context.dispatcher
+
   override def receiveWithLogMessages: Receive = {
-    
+
     case InitBarrierScheduler =>
       context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
       context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
@@ -87,7 +90,7 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
           curId += 1
           log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
           vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
-                  v.getExecutionState == RUNNING).foreach(vertex
+                  v.getExecutionState == ExecutionState.RUNNING).foreach(vertex
           => vertex.getCurrentAssignedResource.getInstance.getTaskManager
                     ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
         case _ =>
@@ -105,16 +108,17 @@ class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
               acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
             case None =>
           }
-          log.debug(acks.toString)
+          log.debug(acks.toString())
       
     case CompactAndUpdate =>
-      val barrierCount = acks.values.foldLeft(TreeMap[Long,Int]().withDefaultValue(0))((dict,myList)
+      val barrierCount =
+        acks.values.foldLeft(TreeMap[JLong,Int]().withDefaultValue(0))((dict,myList)
       => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
       val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if(!keysToKeep.isEmpty) keysToKeep.max else ackId
+      ackId = if(keysToKeep.nonEmpty) keysToKeep.max else ackId
       acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
       states = states.filterKeys(_._3 >= ackId)
-      log.debug("Last global barrier is " + ackId)
+      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
       executionGraph.loadOperatorStates(states)
       
   }
@@ -128,7 +132,7 @@ object StreamCheckpointCoordinator {
     val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
     val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
       vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[Long])).toMap, Map() ,interval,0L,-1L)))
+              List.empty[JLong])).toMap, Map() ,interval,0L,-1L)))
     monitor ! InitBarrierScheduler
     monitor
   }
@@ -145,14 +149,3 @@ case class BarrierTimeout()
 case class InitBarrierScheduler()
 
 case class CompactAndUpdate()
-
-case class BarrierReq(attemptID: ExecutionAttemptID,checkpointID: Long)
-
-case class BarrierAck(jobID: JobID,jobVertexID: JobVertexID,instanceID: Int,checkpointID: Long)
-
-case class StateBarrierAck(jobID: JobID, jobVertexID: JobVertexID, instanceID: Integer,
-                           checkpointID: Long, states: StateHandle)
-       
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
new file mode 100644
index 0000000..9f6f51a
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.JobVertexID
+import org.apache.flink.runtime.state.StateHandle
+
+/**
+ * Actor messages specific to checkpoints (triggering, acknowledging,
+ * state transfer, ...)
+ */
+object CheckpointingMessages {
+
+  /**
+   * Abstract base trait for all checkpoint messages.
+   */
+  trait CheckpointingMessage
+
+  // --------------------------------------------------------------------------
+
+  case class BarrierReq(attemptID: ExecutionAttemptID,
+                        checkpointID: Long) extends CheckpointingMessage
+
+  case class BarrierAck(jobID: JobID,
+                        jobVertexID:JobVertexID,
+                        instanceID: Int,
+                        checkpointID: Long) extends CheckpointingMessage
+
+  case class StateBarrierAck(jobID: JobID,
+                             jobVertexID: JobVertexID,
+                             instanceID: Integer,
+                             checkpointID: java.lang.Long,
+                             states: StateHandle) extends CheckpointingMessage
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 6785c31..6bc59a2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -64,7 +64,7 @@ object ExecutionGraphMessages {
   /**
    * Denotes the job state change of a job.
    *
-   * @param jobID identifying the correspong job
+   * @param jobID identifying the corresponding job
    * @param newJobStatus
    * @param timestamp
    * @param error

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index dab4671..73e20c0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGra
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 import scala.collection.JavaConverters._
 
@@ -53,14 +52,6 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID)
 
   /**
-   * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
-   * boolean value which is sent back to the sender.
-   *
-   * @param taskExecutionState
-   */
-  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
-
-  /**
    * Requesting next input split for the
    * [[org.apache.flink.runtime.executiongraph.ExecutionJobVertex]]
    * of the job specified by [[jobID]]. The next input split is sent back to the sender as a
@@ -73,6 +64,14 @@ object JobManagerMessages {
   ExecutionAttemptID)
 
   /**
+   * Contains the next input split for a task. This message is a response to
+   * [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
+   *
+   * @param splitData
+   */
+  case class NextInputSplit(splitData: Array[Byte])
+
+  /**
    * Notifies the [[org.apache.flink.runtime.jobmanager.JobManager]] about available data for a
    * produced partition.
    * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
index 91c3b2d..ab8b8c2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala
@@ -18,17 +18,34 @@
 
 package org.apache.flink.runtime.messages
 
+/**
+ * Generic messages between JobManager, TaskManager, JobClient.
+ */
 object Messages {
 
   /**
    * Message to signal the successful reception of another message
    */
-  case object Acknowledge
+  case object Acknowledge {
+
+    /**
+     * Accessor for the case object instance, to simplify Java interoperability.
+     *
+     * @return The Acknowledge case object instance.
+     */
+    def get(): Acknowledge.type = this
+  }
 
   /**
-   * Signals that the JobManager/TaskManager shall disconnect from the sender
-   * (TaskManager/JobManager)
-   * @param reason
+   * Signals that the receiver (JobManager/TaskManager) shall disconnect the sender.
+   *
+   * The TaskManager may send this on shutdown to let the JobManager realize the TaskManager
+   * loss more quickly.
+   *
+   * The JobManager may send this message to its TaskManagers to let them clean up their
+   * tasks that depend on the JobManager and go into a clean state.
+   *
+   * @param reason The reason for disconnecting, to be displayed in log and error messages.
    */
   case class Disconnect(reason: String)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index 1a3479a..3051d00 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -21,40 +21,67 @@ package org.apache.flink.runtime.messages
 import akka.actor.ActorRef
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
 
+import scala.concurrent.duration.{Deadline, FiniteDuration}
+
+/**
+ * A set of messages from the between TaskManager and JobManager handle the
+ * registration of the TaskManager at the JobManager.
+ */
 object RegistrationMessages {
 
   /**
+   * Marker trait for registration messages.
+   */
+  trait RegistrationMessage
+
+  /**
+   * Triggers the TaskManager to attempt a registration at the JobManager.
+   *
+   * @param jobManagerAkkaURL The actor URL of the JobManager.
+   * @param timeout The timeout for the message. The next retry will double this timeout.
+   * @param deadline Optional deadline until when the registration must be completed.
+   * @param attempt The attempt number, for logging.
+   */
+  case class TriggerTaskManagerRegistration(jobManagerAkkaURL: String,
+                                            timeout: FiniteDuration,
+                                            deadline: Option[Deadline],
+                                            attempt: Int)
+    extends RegistrationMessage
+
+  /**
    * Registers a task manager at the job manager. A successful registration is acknowledged by
    * [[AcknowledgeRegistration]].
    *
-   * @param connectionInfo
-   * @param hardwareDescription
-   * @param numberOfSlots
+   * @param taskManager The TaskManager actor.
+   * @param connectionInfo The TaskManagers connection information.
+   * @param resources The TaskManagers resources.
+   * @param numberOfSlots The number of processing slots offered by the TaskManager.
    */
-  case class RegisterTaskManager(connectionInfo: InstanceConnectionInfo,
-                                 hardwareDescription: HardwareDescription,
+  case class RegisterTaskManager(taskManager: ActorRef,
+                                 connectionInfo: InstanceConnectionInfo,
+                                 resources: HardwareDescription,
                                  numberOfSlots: Int)
+    extends RegistrationMessage
 
   /**
    * Denotes the successful registration of a task manager at the job manager. This is the
    * response triggered by the [[RegisterTaskManager]] message.
    *
-   * @param instanceID
-   * @param blobPort
-   * @param profilerListener
+   * @param instanceID The instance ID under which the TaskManager is registered at the
+   *                   JobManager.
+   * @param blobPort The server port where the JobManager's BLOB service runs.
    */
-  case class AcknowledgeRegistration(instanceID: InstanceID, blobPort: Int,
-                                     profilerListener: Option[ActorRef])
+  case class AcknowledgeRegistration(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+    extends RegistrationMessage
 
   /**
    * Denotes that the TaskManager has already been registered at the JobManager.
    *
-   * @param instanceID
-   * @param blobPort
-   * @param profilerListener
+   * @param instanceID The instance ID under which the TaskManager is registered.
+   * @param blobPort The server port where the JobManager's BLOB service runs.
    */
-  case class AlreadyRegistered(instanceID: InstanceID, blobPort: Int,
-                                profilerListener: Option[ActorRef])
+  case class AlreadyRegistered(jobManager: ActorRef, instanceID: InstanceID, blobPort: Int)
+    extends RegistrationMessage
 
   /**
    * Denotes the unsuccessful registration of a task manager at the job manager. This is the
@@ -63,5 +90,5 @@ object RegistrationMessages {
    * @param reason Reason why the task manager registration was refused
    */
   case class RefuseRegistration(reason: String)
-
+    extends RegistrationMessage
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index d27885b..c81830c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,116 +18,67 @@
 
 package org.apache.flink.runtime.messages
 
-import org.apache.flink.core.io.InputSplit
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceID
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 
+/**
+ * Miscellaneous actor messages exchanged with the TaskManager.
+ */
 object TaskManagerMessages {
 
   /**
-   * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
-   * [[TaskOperationResult]] message.
-   *
-   * @param attemptID
-   */
-  case class CancelTask(attemptID: ExecutionAttemptID)
-
-  /**
-   * Submits a task to the task manager. The submission result is sent back to the sender as a
-   * [[TaskOperationResult]] message.
-   *
-   * @param tasks task deployment descriptor which contains the task relevant information
+   * Tells the task manager to send a heartbeat message to the job manager.
    */
-  case class SubmitTask(tasks: TaskDeploymentDescriptor)
+  case object SendHeartbeat {
 
-  /**
-   * Contains the next input split for a task. This message is a response to
-   * [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
-   *
-   * @param splitData
-   */
-  case class NextInputSplit(splitData: Array[Byte])
+    /**
+     * Accessor for the case object instance, to simplify Java interoperability.
+     * @return The SendHeartbeat case object instance.
+     */
+    def get() : SendHeartbeat.type = SendHeartbeat
+  }
 
   /**
-   * Unregisters the task identified by [[executionID]] from the task manager.
+   * Reports liveliness of the TaskManager instance with the given instance ID to the
+   * This message is sent to the job. This message reports the TaskManagers
+   * metrics, as a byte array.
    *
-   * @param executionID
+   * @param instanceID The instance ID of the reporting TaskManager.
+   * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry.
    */
-  case class UnregisterTask(executionID: ExecutionAttemptID)
+  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
 
-  /**
-   * Updates the reader of the task identified by
-   * [[executionID]] from the task manager.
-   */
-  sealed trait UpdateTask{
-    def executionID: ExecutionAttemptID
-  }
 
-  case class UpdateTaskSinglePartitionInfo(
-    executionID: ExecutionAttemptID,
-    resultId: IntermediateDataSetID,
-    partitionInfo: InputChannelDeploymentDescriptor)
-    extends UpdateTask
-
-  case class UpdateTaskMultiplePartitionInfos(
-    executionID: ExecutionAttemptID,
-    partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
-    extends UpdateTask
-
-  def createUpdateTaskMultiplePartitionInfos(
-    executionID: ExecutionAttemptID,
-    resultIDs: java.util.List[IntermediateDataSetID],
-    partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
-  UpdateTaskMultiplePartitionInfos = {
-    require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as" +
-      "partitionInfos.")
-
-    import scala.collection.JavaConverters.asScalaBufferConverter
-    new UpdateTaskMultiplePartitionInfos(executionID,
-      resultIDs.asScala.zip(partitionInfos.asScala))
-  }
+  // --------------------------------------------------------------------------
+  //  Utility messages used for notifications during TaskManager startup
+  // --------------------------------------------------------------------------
 
   /**
-   * Fails all intermediate result partitions identified by [[executionID]] from the task manager.
-   *
-   * @param executionID
+   * Tells the TaskManager to send a stack trace of all threads to the sender.
+   * The response to this message is the [[StackTrace]] message.
    */
-  case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+  case object SendStackTrace {
 
-  /**
-   * Reports whether a task manager operation has been successful or not. This message will be
-   * sent to the sender as a response to [[SubmitTask]] and [[CancelTask]].
-   *
-   * @param executionID identifying the respective task
-   * @param success indicating whether the operation has been successful
-   * @param description
-   */
-  case class TaskOperationResult(executionID: ExecutionAttemptID, success: Boolean,
-                                 description: String = ""){
-    def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+    /**
+     * Accessor for the case object instance, to simplify Java interoperability.
+     * @return The SendStackTrace case object instance.
+     */
+    def get() : SendStackTrace.type = SendStackTrace
   }
 
   /**
-   * Reports liveliness of an instance with [[instanceID]] to the
-   * [[org.apache.flink.runtime.instance.InstanceManager]]. This message is sent to the job
-   * manager which forwards it to the InstanceManager.
-   *
-   * @param instanceID
-   * @param metricsReport utf-8 encoded JSON report from the metricRegistry.
-   */
-  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
-
-  /**
-   * Sends StackTrace Message of an instance with [[instanceID]]. This message is a response to
-   * [[org.apache.flink.runtime.messages.TaskManagerMessages.SendStackTrace]].
+   * Communicates the stack trace of the TaskManager with the given ID.
+   * This message is the response to [[SendStackTrace]].
    *
-   * @param instanceID
-   * @param stackTrace
+   * @param instanceID The ID of the responding task manager.
+   * @param stackTrace The stack trace, as a string.
    */
   case class StackTrace(instanceID: InstanceID, stackTrace: String)
 
+
+  // --------------------------------------------------------------------------
+  //  Utility messages used for notifications during TaskManager startup
+  // --------------------------------------------------------------------------
+
   /**
    * Requests a notification from the task manager as soon as the task manager has been
    * registered at the job manager. Once the task manager is registered at the job manager a
@@ -141,55 +92,23 @@ object TaskManagerMessages {
    */
   case object RegisteredAtJobManager
 
-  /**
-   * Registers the sender as task manager at the job manager.
-   */
-  case object RegisterAtJobManager
-
-  /**
-   * Makes the task manager sending a heartbeat message to the job manager.
-   */
-  case object SendHeartbeat
 
-  /**
-   * Logs the current memory usage as debug level output.
-   */
-  case object LogMemoryUsage
+  // --------------------------------------------------------------------------
+  //  Utility getters for case objects to simplify access from Java
+  // --------------------------------------------------------------------------
 
   /**
-   * Makes the task manager sending a stack trace message to the sender.
+   * Accessor for the case object instance, to simplify Java interoperability.
+   * @return The NotifyWhenRegisteredAtJobManager case object instance.
    */
-  case object SendStackTrace
+  def getNotifyWhenRegisteredAtJobManagerMessage:
+            NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
 
   /**
-   * Fail the specified task externally
-   *
-   * @param executionID identifying the task to fail
-   * @param cause reason for the external failure
+   * Accessor for the case object instance, to simplify Java interoperability.
+   * @return The RegisteredAtJobManager case object instance.
    */
-  case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
-  
-  // --------------------------------------------------------------------------
-  // Utility methods to allow simpler case object access from Java
-  // --------------------------------------------------------------------------
-  
-  def getNotifyWhenRegisteredAtJobManagerMessage : AnyRef = {
-    NotifyWhenRegisteredAtJobManager
-  }
-  
-  def getRegisteredAtJobManagerMessage : AnyRef = {
-    RegisteredAtJobManager
-  }
-  
-  def getRegisterAtJobManagerMessage : AnyRef = {
-    RegisterAtJobManager
-  }
+  def getRegisteredAtJobManagerMessage:
+            RegisteredAtJobManager.type = RegisteredAtJobManager
 
-  def getSendHeartbeatMessage : AnyRef = {
-    SendHeartbeat
-  }
-
-  def getLogMemoryUsageMessage : AnyRef = {
-    RegisteredAtJobManager
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
new file mode 100644
index 0000000..c8c5726
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.runtime.deployment.{TaskDeploymentDescriptor, InputChannelDeploymentDescriptor}
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.taskmanager.TaskExecutionState
+
+/**
+ * A set of messages that control the deployment and the state of Tasks executed
+ * on the TaskManager
+ */
+object TaskMessages {
+
+  /**
+   * Marker trait for task messages.
+   */
+  trait TaskMessage
+
+  // --------------------------------------------------------------------------
+  //  Starting and stopping Tasks
+  // --------------------------------------------------------------------------
+
+  /**
+   * Submits a task to the task manager. The result is to this message is a
+   * [[TaskOperationResult]] message.
+   *
+   * @param tasks Descriptor which contains the information to start the task.
+   */
+  case class SubmitTask(tasks: TaskDeploymentDescriptor)
+    extends TaskMessage
+
+  /**
+   * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
+   * [[TaskOperationResult]] message.
+   *
+   * @param attemptID The task's execution attempt ID.
+   */
+  case class CancelTask(attemptID: ExecutionAttemptID)
+    extends TaskMessage
+
+  /**
+   * Triggers a fail of specified task from the outside (as opposed to the task throwing
+   * an exception itself) with the given exception as the cause.
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param cause The reason for the external failure.
+   */
+  case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+    extends TaskMessage
+
+  /**
+   * Unregister the task identified by [[executionID]] from the TaskManager.
+   * Sent to the TaskManager by futures and callbacks.
+   *
+   * @param executionID The task's execution attempt ID.
+   */
+  case class UnregisterTask(executionID: ExecutionAttemptID)
+    extends TaskMessage
+
+
+  // --------------------------------------------------------------------------
+  //  Updates to Intermediate Results
+  // --------------------------------------------------------------------------
+
+  /**
+   * Base class for messages that update the information about location of input partitions
+   */
+  abstract sealed class UpdatePartitionInfo extends TaskMessage {
+    def executionID: ExecutionAttemptID
+  }
+
+  /**
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param resultId The input reader to update.
+   * @param partitionInfo The partition info update.
+   */
+  case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID,
+                                           resultId: IntermediateDataSetID,
+                                           partitionInfo: InputChannelDeploymentDescriptor)
+    extends UpdatePartitionInfo
+
+  /**
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param partitionInfos List of input gates with channel descriptors to update.
+   */
+  case class UpdateTaskMultiplePartitionInfos(
+                    executionID: ExecutionAttemptID,
+                    partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+    extends UpdatePartitionInfo
+
+  /**
+   * Fails (and releases) all intermediate result partitions identified by
+   * [[executionID]] from the task manager.
+   *
+   * @param executionID The task's execution attempt ID.
+   */
+  case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+    extends TaskMessage
+
+
+  // --------------------------------------------------------------------------
+  //  Report Messages
+  // --------------------------------------------------------------------------
+
+  /**
+   * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
+   * boolean value which is sent back to the sender.
+   *
+   * @param taskExecutionState The changed task state
+   */
+  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
+    extends TaskMessage
+
+  /**
+   * Response message to updates in the task state. Send for example as a response to
+   *
+   *  - [[SubmitTask]]
+   *  - [[CancelTask]]
+   *
+   * @param executionID identifying the respective task
+   * @param success indicating whether the operation has been successful
+   * @param description Optional description for unsuccessful results.
+   */
+  case class TaskOperationResult(executionID: ExecutionAttemptID,
+                                 success: Boolean,
+                                 description: String)
+    extends TaskMessage
+  {
+    def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+  }
+
+
+  // --------------------------------------------------------------------------
+  //  Utility Functions
+  // --------------------------------------------------------------------------
+
+  def createUpdateTaskMultiplePartitionInfos(
+                               executionID: ExecutionAttemptID,
+                               resultIDs: java.util.List[IntermediateDataSetID],
+                               partitionInfos: java.util.List[InputChannelDeploymentDescriptor]):
+  UpdateTaskMultiplePartitionInfos = {
+
+    require(resultIDs.size() == partitionInfos.size(),
+      "ResultIDs must have the same length as partitionInfos.")
+
+    import scala.collection.JavaConverters.asScalaBufferConverter
+
+    new UpdateTaskMultiplePartitionInfos(executionID,
+      resultIDs.asScala.zip(partitionInfos.asScala))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e74521c1/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cb79fbc..13e1ccd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -100,8 +100,18 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
       TaskManager.TASK_MANAGER_NAME
     }
 
-    TaskManager.startTaskManagerActor(config, system, HOSTNAME, taskManagerActorName,
-      singleActorSystem, localExecution, classOf[TaskManager])
+    val jobManagerPath: Option[String] = if (singleActorSystem) {
+      Some(jobManagerActor.path.toString)
+    } else {
+      None
+    }
+
+    TaskManager.startTaskManagerComponentsAndActor(config, system,
+                                                   HOSTNAME, // network interface to bind to
+                                                   Some(taskManagerActorName), // actor name
+                                                   jobManagerPath, // job manager akka URL
+                                                   localExecution, // start network stack?
+                                                   classOf[TaskManager])
   }
 
   def getJobClient(): ActorRef = {