You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/11 15:23:08 UTC

[2/2] flink git commit: [FLINK-4738] [TaskManager] Port TaskManager logic to new Flip-6 TaskManager

[FLINK-4738] [TaskManager] Port TaskManager logic to new Flip-6 TaskManager

The ported logic contains the task lifecycle management methods, JobManager association and
setup of TaskManager components.

Introduce Rpc implementations for TaskManager components

Implement metrics setup

Move more TaskManager components out of the constructor to make TaskExecutor more testable

Add RpcMethod annotation to TaskExecutor#confirmCheckpoint

This closes #2594.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c656d92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c656d92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c656d92

Branch: refs/heads/flip-6
Commit: 8c656d9252fd01ea82c75ff1ea830b110f68f8a9
Parents: f863d16
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 28 14:39:51 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 11 17:22:34 2016 +0200

----------------------------------------------------------------------
 .../CheckpointCoordinatorGateway.java           |  43 ++
 .../deployment/TaskDeploymentDescriptor.java    |   9 +
 .../runtime/executiongraph/PartitionInfo.java   |  47 ++
 .../flink/runtime/filecache/FileCache.java      |  17 +-
 .../jobgraph/tasks/InputSplitProvider.java      |   3 +-
 .../tasks/InputSplitProviderException.java      |  36 ++
 .../jobmaster/ExecutionGraphException.java      |  41 ++
 .../runtime/jobmaster/JobManagerException.java  |  39 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  50 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  30 +-
 .../jobmaster/MiniClusterJobDispatcher.java     |   2 +-
 .../runtime/jobmaster/SerializedInputSplit.java |  39 ++
 .../jobmaster/message/NextInputSplit.java       |  39 --
 .../flink/runtime/operators/DataSourceTask.java |  12 +-
 .../runtime/query/KvStateRegistryGateway.java   |  57 ++
 .../taskexecutor/JobManagerConnection.java      |  91 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 627 +++++++++++++++++--
 .../taskexecutor/TaskExecutorGateway.java       |  80 ++-
 .../taskexecutor/TaskManagerConfiguration.java  |   3 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |   3 +
 .../taskexecutor/TaskManagerServices.java       |  51 +-
 .../flink/runtime/taskexecutor/TaskSlot.java    |  73 +++
 .../runtime/taskexecutor/TaskSlotMapping.java   |  44 ++
 .../exceptions/CheckpointException.java         |  41 ++
 .../exceptions/PartitionException.java          |  41 ++
 .../taskexecutor/exceptions/TaskException.java  |  41 ++
 .../exceptions/TaskManagerException.java        |  41 ++
 .../exceptions/TaskSubmissionException.java     |  41 ++
 .../rpc/RpcCheckpointResponder.java             |  71 +++
 .../taskexecutor/rpc/RpcInputSplitProvider.java |  73 +++
 .../rpc/RpcKvStateRegistryListener.java         |  73 +++
 .../rpc/RpcPartitionStateChecker.java           |  48 ++
 .../RpcResultPartitionConsumableNotifier.java   |  67 ++
 .../utils/TaskExecutorMetricsInitializer.java   | 257 ++++++++
 .../ActorGatewayTaskManagerActions.java         |  59 ++
 .../ActorGatewayTaskManagerConnection.java      |  59 --
 .../apache/flink/runtime/taskmanager/Task.java  |  23 +-
 .../runtime/taskmanager/TaskExecutionState.java |   4 +-
 .../taskmanager/TaskInputSplitProvider.java     |  49 +-
 .../runtime/taskmanager/TaskManagerActions.java |  57 ++
 .../taskmanager/TaskManagerConnection.java      |  57 --
 .../flink/runtime/taskmanager/TaskManager.scala | 169 +----
 .../FileCacheDeleteValidationTest.java          |   4 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |  24 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../taskmanager/TaskInputSplitProviderTest.java |   3 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   4 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   4 +-
 .../source/InputFormatSourceFunction.java       |   8 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  11 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 52 files changed, 2340 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
new file mode 100644
index 0000000..e448ebc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
+
+public interface CheckpointCoordinatorGateway extends RpcGateway {
+
+	void acknowledgeCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		CheckpointStateHandles checkpointStateHandles,
+		long synchronousDurationMillis,
+		long asynchronousDurationMillis,
+		long bytesBufferedInAlignment,
+		long alignmentDurationNanos);
+
+	void declineCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		long checkpointTimestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 7bbdb2a..b1ac665 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -57,6 +58,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The ID referencing the attempt to execute the task. */
 	private final ExecutionAttemptID executionId;
 
+	/** The allocation ID of the slot in which the task shall be run */
+	private final AllocationID allocationID;
+
 	/** The task's name. */
 	private final String taskName;
 
@@ -158,6 +162,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.operatorState = operatorState;
 		this.keyGroupState = keyGroupState;
 		this.partitionableOperatorState = partitionableOperatorStateHandles;
+		this.allocationID = new AllocationID();
 	}
 
 	public TaskDeploymentDescriptor(
@@ -322,6 +327,10 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return requiredClasspaths;
 	}
 
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+
 	@Override
 	public String toString() {
 		return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
new file mode 100644
index 0000000..1a79a99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Contains information where to find a partition. The partition is defined by the
+ * {@link IntermediateDataSetID} and the partition location is specified by
+ * {@link InputChannelDeploymentDescriptor}.
+ */
+public class PartitionInfo {
+
+	private final IntermediateDataSetID intermediateDataSetID;
+	private final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor;
+
+	public PartitionInfo(IntermediateDataSetID intermediateResultPartitionID, InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor) {
+		this.intermediateDataSetID = Preconditions.checkNotNull(intermediateResultPartitionID);
+		this.inputChannelDeploymentDescriptor = Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
+	}
+
+	public IntermediateDataSetID getIntermediateDataSetID() {
+		return intermediateDataSetID;
+	}
+
+	public InputChannelDeploymentDescriptor getInputChannelDeploymentDescriptor() {
+		return inputChannelDeploymentDescriptor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b5bdcaf..a07f1a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
@@ -44,6 +42,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,17 +70,15 @@ public class FileCache {
 
 	// ------------------------------------------------------------------------
 
-	public FileCache(Configuration config) throws IOException {
-		
-		String tempDirs = config.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+	public FileCache(String[] tempDirectories) throws IOException {
 
-		String[] directories = tempDirs.split(",|" + File.pathSeparator);
-		storageDirectories = new File[directories.length];
+		Preconditions.checkNotNull(tempDirectories);
 
-		for (int i = 0; i < directories.length; i++) {
+		storageDirectories = new File[tempDirectories.length];
+
+		for (int i = 0; i < tempDirectories.length; i++) {
 			String cacheDirName = "flink-dist-cache-" + UUID.randomUUID().toString();
-			storageDirectories[i] = new File(directories[i], cacheDirName);
+			storageDirectories[i] = new File(tempDirectories[i], cacheDirName);
 			String path = storageDirectories[i].getAbsolutePath();
 
 			if (storageDirectories[i].mkdirs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
index e0cde17..464b13f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
@@ -34,6 +34,7 @@ public interface InputSplitProvider {
 	 * @param userCodeClassLoader used to deserialize input splits
 	 * @return the next input split to be consumed by the calling task or <code>null</code> if the
 	 *         task shall not consume any further input splits.
+	 * @throws InputSplitProviderException if fetching the next input split fails
 	 */
-	InputSplit getNextInputSplit(ClassLoader userCodeClassLoader);
+	InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
new file mode 100644
index 0000000..ac73c6f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProviderException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+public class InputSplitProviderException extends Exception {
+
+	private static final long serialVersionUID = -8043190713983651548L;
+
+	public InputSplitProviderException(String message) {
+		super(message);
+	}
+
+	public InputSplitProviderException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public InputSplitProviderException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
new file mode 100644
index 0000000..7c35f3d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionGraphException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+/**
+ * Exceptions thrown by operations on the {@link ExecutionGraph} by the {@link JobMaster}.
+ */
+public class ExecutionGraphException extends JobManagerException {
+
+	private static final long serialVersionUID = -5439002256464886357L;
+
+	public ExecutionGraphException(String message) {
+		super(message);
+	}
+
+	public ExecutionGraphException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ExecutionGraphException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
new file mode 100644
index 0000000..bc2759d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+/**
+ * Base exception thrown by the {@link JobMaster}.
+ */
+public class JobManagerException extends Exception {
+
+	private static final long serialVersionUID = -7290962952242188064L;
+
+	public JobManagerException(final String message) {
+		super(message);
+	}
+
+	public JobManagerException(final String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public JobManagerException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e67a167..8f3a342 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -60,9 +61,9 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -71,6 +72,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
@@ -507,12 +509,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * @return Acknowledge the task execution state update
 	 */
 	@RpcMethod
-	public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+	public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException {
 		if (taskExecutionState == null) {
-			return false;
+			throw new NullPointerException("TaskExecutionState must not be null.");
+		}
+
+		if (executionGraph.updateState(taskExecutionState)) {
+			return Acknowledge.get();
 		} else {
-			return executionGraph.updateState(taskExecutionState);
+			throw new ExecutionGraphException("The execution attempt " +
+				taskExecutionState.getID() + " was not found.");
 		}
+
 	}
 
 	//----------------------------------------------------------------------------------------------\u2028
@@ -531,7 +539,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public NextInputSplit requestNextInputSplit(
+	public SerializedInputSplit requestNextInputSplit(
 		final JobVertexID vertexID,
 		final ExecutionAttemptID executionAttempt) throws Exception
 	{
@@ -569,7 +577,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		try {
 			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
-			return new NextInputSplit(serializedInputSplit);
+			return new SerializedInputSplit(serializedInputSplit);
 		} catch (Exception ex) {
 			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
 			IOException reason = new IOException("Could not serialize the next input split of class " +
@@ -591,8 +599,36 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+	public Acknowledge scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
 		executionGraph.scheduleOrUpdateConsumers(partitionID);
+		return Acknowledge.get();
+	}
+
+	@RpcMethod
+	public void disconnectTaskManager(final ResourceID resourceID) {
+		throw new UnsupportedOperationException();
+	}
+
+	@RpcMethod
+	public void acknowledgeCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		CheckpointStateHandles checkpointStateHandles,
+		long synchronousDurationMillis,
+		long asynchronousDurationMillis,
+		long bytesBufferedInAlignment,
+		long alignmentDurationNanos) {
+		throw new UnsupportedOperationException();
+	}
+
+	@RpcMethod
+	public void declineCheckpoint(
+		JobID jobID,
+		ExecutionAttemptID executionAttemptID,
+		long checkpointID,
+		long checkpointTimestamp) {
+		throw new UnsupportedOperationException();
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 686a3f3..e3e57d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +30,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
 import java.util.UUID;
@@ -36,7 +39,7 @@ import java.util.UUID;
 /**
  * {@link JobMaster} rpc gateway interface
  */
-public interface JobMasterGateway extends RpcGateway {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
 	/**
 	 * Starting the job under the given leader session ID.
@@ -57,20 +60,19 @@ public interface JobMasterGateway extends RpcGateway {
 	 * @param taskExecutionState New task execution state for a given task
 	 * @return Future flag of the task execution state update result
 	 */
-	Future<Boolean> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
 
 	/**
 	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
-	 * as a {@link NextInputSplit} message.
+	 * as a {@link SerializedInputSplit} message.
 	 *
 	 * @param vertexID         The job vertex id
 	 * @param executionAttempt The execution attempt id
 	 * @return The future of the input split. If there is no further input split, will return an empty object.
-	 * @throws Exception if some error occurred or information mismatch.
 	 */
-	Future<NextInputSplit> requestNextInputSplit(
+	Future<SerializedInputSplit> requestNextInputSplit(
 		final JobVertexID vertexID,
-		final ExecutionAttemptID executionAttempt) throws Exception;
+		final ExecutionAttemptID executionAttempt);
 
 	/**
 	 * Requests the current state of the partition.
@@ -96,6 +98,16 @@ public interface JobMasterGateway extends RpcGateway {
 	 * The JobManager then can decide when to schedule the partition consumers of the given session.
 	 *
 	 * @param partitionID The partition which has already produced data
+	 * @param timeout before the rpc call fails
+	 * @return Future acknowledge of the schedule or update operation
 	 */
-	void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
+	Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout);
+
+	/**
+	 * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
+	 * {@link JobMaster}.
+	 *
+	 * @param resourceID identifying the TaskManager to disconnect
+	 */
+	void disconnectTaskManager(ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index 792bfd5..e8fb5bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -356,7 +356,7 @@ public class MiniClusterJobDispatcher {
 			final Throwable runnerException = this.runnerException;
 			final JobExecutionResult result = this.result;
 
-			// (1) we check if teh job terminated with an exception
+			// (1) we check if the job terminated with an exception
 			// (2) we check whether the job completed successfully
 			// (3) we check if we have exceptions from the JobManagers. the job may still have
 			//     completed successfully in that case, if multiple JobMasters were running

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
new file mode 100644
index 0000000..bfdc65a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SerializedInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import java.io.Serializable;
+
+public class SerializedInputSplit implements Serializable {
+	private static final long serialVersionUID = -2063021844254152064L;
+
+	private final byte[] inputSplitData;
+
+	public SerializedInputSplit(byte[] inputSplitData) {
+		this.inputSplitData = inputSplitData;
+	}
+
+	public byte[] getInputSplitData() {
+		return inputSplitData;
+	}
+
+	public boolean isEmpty() {
+		return inputSplitData == null || inputSplitData.length == 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
deleted file mode 100644
index fe511ed..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * Contains the next input split for a task.
- */
-public class NextInputSplit implements Serializable {
-
-	private static final long serialVersionUID = -1355784074565856240L;
-
-	private final byte[] splitData;
-
-	public NextInputSplit(final byte[] splitData) {
-		this.splitData = splitData;
-	}
-
-	public byte[] getSplitData() {
-		return splitData;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index c062bf8..1c751fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
@@ -332,9 +333,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 				if (nextSplit != null) {
 					return true;
 				}
-				
-				InputSplit split = provider.getNextInputSplit(getUserCodeClassLoader());
-				
+
+				final InputSplit split;
+				try {
+					split = provider.getNextInputSplit(getUserCodeClassLoader());
+				} catch (InputSplitProviderException e) {
+					throw new RuntimeException("Could not retrieve next input split.", e);
+				}
+
 				if (split != null) {
 					this.nextSplit = split;
 					return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
new file mode 100644
index 0000000..d285074
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+public interface KvStateRegistryGateway extends RpcGateway {
+	/**
+	 * Notifies the listener about a registered KvState instance.
+	 *
+	 * @param jobId            Job ID the KvState instance belongs to
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
+	 * @param registrationName Name under which the KvState is registered
+	 * @param kvStateId        ID of the KvState instance
+	 */
+	void notifyKvStateRegistered(
+		JobID jobId,
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName,
+		KvStateID kvStateId,
+		KvStateServerAddress kvStateServerAddress);
+
+	/**
+	 * Notifies the listener about an unregistered KvState instance.
+	 *
+	 * @param jobId            Job ID the KvState instance belongs to
+	 * @param jobVertexId      JobVertexID the KvState instance belongs to
+	 * @param keyGroupRange    Key group range the KvState instance belongs to
+	 * @param registrationName Name under which the KvState is registered
+	 */
+	void notifyKvStateUnregistered(
+		JobID jobId,
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
new file mode 100644
index 0000000..ef62ef1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Container class for JobManager specific communication utils used by the {@link TaskExecutor}.
+ */
+public class JobManagerConnection {
+
+	// Gateway to the job master
+	private final JobMasterGateway jobMasterGateway;
+
+	// Task manager actions with respect to the connected job manager
+	private final TaskManagerActions taskManagerActions;
+
+	// Checkpoint responder for the specific job manager
+	private final CheckpointResponder checkpointResponder;
+
+	// Library cache manager connected to the specific job manager
+	private final LibraryCacheManager libraryCacheManager;
+
+	// Result partition consumable notifier for the specific job manager
+	private final ResultPartitionConsumableNotifier resultPartitionConsumableNotifier;
+
+	// Partition state checker for the specific job manager
+	private final PartitionStateChecker partitionStateChecker;
+
+	public JobManagerConnection(
+		JobMasterGateway jobMasterGateway,
+		TaskManagerActions taskManagerActions,
+		CheckpointResponder checkpointResponder,
+		LibraryCacheManager libraryCacheManager,
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+		PartitionStateChecker partitionStateChecker) {
+
+		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
+		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
+		this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
+		this.resultPartitionConsumableNotifier = Preconditions.checkNotNull(resultPartitionConsumableNotifier);
+		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
+	}
+
+	public JobMasterGateway getJobManagerGateway() {
+		return jobMasterGateway;
+	}
+
+	public TaskManagerActions getTaskManagerActions() {
+		return taskManagerActions;
+	}
+
+	public CheckpointResponder getCheckpointResponder() {
+		return checkpointResponder;
+	}
+
+	public LibraryCacheManager getLibraryCacheManager() {
+		return libraryCacheManager;
+	}
+
+	public ResultPartitionConsumableNotifier getResultPartitionConsumableNotifier() {
+		return resultPartitionConsumableNotifier;
+	}
+
+	public PartitionStateChecker getPartitionStateChecker() {
+		return partitionStateChecker;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c0041a3..35b639b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,15 +18,48 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
+import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
+import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
+import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -38,11 +71,17 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -50,12 +89,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * TaskExecutor implementation. The task executor is responsible for the execution of multiple
- * {@link org.apache.flink.runtime.taskmanager.Task}.
+ * {@link Task}.
  */
 public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
-
 	/** The connection information of this task manager */
 	private final TaskManagerLocation taskManagerLocation;
 
@@ -77,19 +114,38 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The metric registry in the task manager */
 	private final MetricRegistry metricRegistry;
 
-	/** The number of slots in the task manager, should be 1 for YARN */
-	private final int numberOfSlots;
-
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
 
+	private final TaskManagerMetricGroup taskManagerMetricGroup;
+
+	private final BroadcastVariableManager broadcastVariableManager;
+	
 	/** Slots which have become available but haven't been confirmed by the RM */
 	private final Set<SlotID> unconfirmedFreeSlots;
 
+
+	private final FileCache fileCache;
+
+	// TODO: Try to get rid of it
+	private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
+
 	// --------- resource manager --------
 
 	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
 
+	// --------- job manager connections -----------
+
+	private Map<ResourceID, JobManagerConnection> jobManagerConnections;
+
+	// --------- Slot allocation table --------
+
+	private Map<AllocationID, TaskSlot> taskSlots;
+
+	// --------- Slot allocation table --------
+
+	private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
 	// ------------------------------------------------------------------------
 
 	public TaskExecutor(
@@ -101,6 +157,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		NetworkEnvironment networkEnvironment,
 		HighAvailabilityServices haServices,
 		MetricRegistry metricRegistry,
+		TaskManagerMetricGroup taskManagerMetricGroup,
+		BroadcastVariableManager broadcastVariableManager,
+		FileCache fileCache,
 		FatalErrorHandler fatalErrorHandler) {
 
 		super(rpcService);
@@ -115,10 +174,19 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.haServices = checkNotNull(haServices);
 		this.metricRegistry = checkNotNull(metricRegistry);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+		this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup);
+		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
+		this.fileCache = checkNotNull(fileCache);
+		this.taskManagerRuntimeInfo = new TaskManagerRuntimeInfo(
+			taskManagerLocation.getHostname(),
+			new UnmodifiableConfiguration(taskManagerConfiguration.getConfiguration()),
+			taskManagerConfiguration.getTmpDirPaths());
 
-		this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
+		this.jobManagerConnections = new HashMap<>(4);
 
 		this.unconfirmedFreeSlots = new HashSet<>();
+		this.taskSlots = new HashMap<>(taskManagerConfiguration.getNumberSlots());
+		this.taskSlotMappings = new HashMap<>(taskManagerConfiguration.getNumberSlots() * 2);
 	}
 
 	// ------------------------------------------------------------------------
@@ -137,12 +205,436 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	/**
+	 * Called to shut down the TaskManager. The method closes all TaskManager services.
+	 */
+	@Override
+	public void shutDown() {
+		log.info("Stopping TaskManager {}.", getAddress());
+
+		if (resourceManagerConnection.isConnected()) {
+			try {
+				resourceManagerConnection.close();
+			} catch (Exception e) {
+				log.error("Could not cleanly close the ResourceManager connection.", e);
+			}
+		}
+
+		try {
+			ioManager.shutdown();
+		} catch (Exception e) {
+			log.error("IOManager did not shut down properly.", e);
+		}
+
+		try {
+			memoryManager.shutdown();
+		} catch (Exception e) {
+			log.error("MemoryManager did not shut down properly.", e);
+		}
+
+		try {
+			networkEnvironment.shutdown();
+		} catch (Exception e) {
+			log.error("Network environment did not shut down properly.", e);
+		}
+
+		try {
+			fileCache.shutdown();
+		} catch (Exception e) {
+			log.error("File cache did not shut down properly.", e);
+		}
+
+		try {
+			metricRegistry.shutdown();
+		} catch (Exception e) {
+			log.error("MetricRegistry did not shut down properly.", e);
+		}
+
+		log.info("Stopped TaskManager {}.", getAddress());
+	}
+
+	// ========================================================================
+	//  RPC methods
+	// ========================================================================
+
+	// ----------------------------------------------------------------------
+	// Task lifecycle RPCs
+	// ----------------------------------------------------------------------
+
+	@RpcMethod
+	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID) throws TaskSubmissionException {
+
+		JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID);
+
+		if (jobManagerConnection == null) {
+			final String message = "Could not submit task because JobManager " + jobManagerID +
+				" was not associated.";
+
+			log.debug(message);
+			throw new TaskSubmissionException(message);
+		}
+
+		TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
+
+		if (taskSlot == null) {
+			final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID() + '.';
+			log.debug(message);
+			throw new TaskSubmissionException(message);
+		}
+
+		TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd);
+
+		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
+			jobManagerConnection.getJobManagerGateway(),
+			tdd.getJobID(),
+			tdd.getVertexID(),
+			tdd.getExecutionId(),
+			taskManagerConfiguration.getTimeout());
+
+		TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
+		CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
+		LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
+		PartitionStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
+
+		Task task = new Task(
+			tdd,
+			memoryManager,
+			ioManager,
+			networkEnvironment,
+			broadcastVariableManager,
+			taskManagerActions,
+			inputSplitProvider,
+			checkpointResponder,
+			libraryCache,
+			fileCache,
+			taskManagerRuntimeInfo,
+			taskMetricGroup,
+			resultPartitionConsumableNotifier,
+			partitionStateChecker,
+			getRpcService().getExecutor());
+
+		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
+
+		if(taskSlot.add(task)) {
+			TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot);
+
+			taskSlotMappings.put(task.getExecutionId(), taskSlotMapping);
+			task.startTaskThread();
+
+			return Acknowledge.get();
+		} else {
+			final String message = "TaskManager already contains a task for id " +
+				task.getExecutionId() + '.';
+
+			log.debug(message);
+			throw new TaskSubmissionException(message);
+		}
+	}
+
+	@RpcMethod
+	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			try {
+				task.cancelExecution();
+				return Acknowledge.get();
+			} catch (Throwable t) {
+				throw new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', t);
+			}
+		} else {
+			final String message = "Cannot find task to stop for execution " + executionAttemptID + '.';
+
+			log.debug(message);
+			throw new TaskException(message);
+		}
+	}
+
+	@RpcMethod
+	public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			try {
+				task.stopExecution();
+				return Acknowledge.get();
+			} catch (Throwable t) {
+				throw new TaskException("Cannot stop task for execution " + executionAttemptID + '.', t);
+			}
+		} else {
+			final String message = "Cannot find task to stop for execution " + executionAttemptID + '.';
+
+			log.debug(message);
+			throw new TaskException(message);
+		}
+	}
+
+	// ----------------------------------------------------------------------
+	// Partition lifecycle RPCs
+	// ----------------------------------------------------------------------
+
+	@RpcMethod
+	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos) throws PartitionException {
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			for (final PartitionInfo partitionInfo: partitionInfos) {
+				IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
+
+				final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
+
+				if (singleInputGate != null) {
+					// Run asynchronously because it might be blocking
+					getRpcService().execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
+							} catch (IOException | InterruptedException e) {
+								log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
+
+								try {
+									task.failExternally(e);
+								} catch (RuntimeException re) {
+									// TODO: Check whether we need this or make exception in failExtenally checked
+									log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, re);
+								}
+							}
+						}
+					});
+				} else {
+					throw new PartitionException("No reader with ID " +
+						intermediateResultPartitionID + " for task " + executionAttemptID +
+						" was found.");
+				}
+			}
+
+			return Acknowledge.get();
+		} else {
+			log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
+			return Acknowledge.get();
+		}
+	}
+
+	@RpcMethod
+	public void failPartition(ExecutionAttemptID executionAttemptID) {
+		log.info("Discarding the results produced by task execution {}.", executionAttemptID);
+
+		try {
+			networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
+		} catch (Throwable t) {
+			// TODO: Do we still need this catch branch?
+			onFatalError(t);
+		}
+
+		// TODO: Maybe it's better to return an Acknowledge here to notify the JM about the success/failure with an Exception
+	}
+
+	// ----------------------------------------------------------------------
+	// Checkpointing RPCs
+	// ----------------------------------------------------------------------
+
+	@RpcMethod
+	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
+
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
+
+			return Acknowledge.get();
+		} else {
+			final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
+
+			log.debug(message);
+			throw new CheckpointException(message);
+		}
+	}
+
+	@RpcMethod
+	public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+		log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
+
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			task.notifyCheckpointComplete(checkpointId);
+
+			return Acknowledge.get();
+		} else {
+			final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
+
+			log.debug(message);
+			throw new CheckpointException(message);
+		}
+	}
+
+	/**
+	 * Requests a slot from the TaskManager
+	 *
+	 * @param slotID Slot id for the request
+	 * @param allocationID id for the request
+	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @return answer to the slot request
+	 */
+	@RpcMethod
+	public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) {
+		if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) {
+			return new TMSlotRequestRejected(
+				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+		}
+		if (unconfirmedFreeSlots.contains(slotID)) {
+			// check if request has not been blacklisted because the notification of a free slot
+			// has not been confirmed by the ResourceManager
+			return new TMSlotRequestRejected(
+				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+		}
+		return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID);
+
+	}
+
 	// ------------------------------------------------------------------------
-	//  RPC methods - ResourceManager related
+	//  Internal methods
 	// ------------------------------------------------------------------------
 
-	@RpcMethod
-	public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
+	private JobManagerConnection getJobManagerConnection(ResourceID jobManagerID) {
+		return jobManagerConnections.get(jobManagerID);
+	}
+
+	private Task getTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping taskSlotMapping = taskSlotMappings.get(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			return taskSlotMapping.getTask();
+		} else {
+			return null;
+		}
+	}
+
+	private Task removeTask(ExecutionAttemptID executionAttemptID) {
+		TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
+
+		if (taskSlotMapping != null) {
+			final Task task = taskSlotMapping.getTask();
+			final TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+
+			taskSlot.remove(task);
+
+			return task;
+		} else {
+			return null;
+		}
+	}
+
+	private Iterable<Task> getAllTasks() {
+		final Iterator<TaskSlotMapping> taskEntryIterator = taskSlotMappings.values().iterator();
+		final Iterator<Task> iterator = new Iterator<Task>() {
+			@Override
+			public boolean hasNext() {
+				return taskEntryIterator.hasNext();
+			}
+
+			@Override
+			public Task next() {
+				return taskEntryIterator.next().getTask();
+			}
+
+			@Override
+			public void remove() {
+				taskEntryIterator.remove();
+			}
+		};
+
+		return new Iterable<Task>() {
+			@Override
+			public Iterator<Task> iterator() {
+				return iterator;
+			}
+		};
+	}
+
+	private void clearTasks() {
+		taskSlotMappings.clear();
+
+		for (TaskSlot taskSlot: taskSlots.values()) {
+			taskSlot.clear();
+		}
+	}
+
+	private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
+		final Task task = getTask(executionAttemptID);
+
+		if (task != null) {
+			try {
+				task.failExternally(cause);
+			} catch (Throwable t) {
+				log.error("Could not fail task {}.", executionAttemptID, t);
+			}
+		} else {
+			log.debug("Cannot find task to fail for execution {}.", executionAttemptID);
+		}
+	}
+
+	private void cancelAndClearAllTasks(Throwable cause) {
+		log.info("Cancellaing all computations and discarding all cached data.");
+
+		Iterable<Task> tasks = getAllTasks();
+
+		for (Task task: tasks) {
+			task.failExternally(cause);
+		}
+
+		clearTasks();
+	}
+
+	private void updateTaskExecutionState(final JobMasterGateway jobMasterGateway, final TaskExecutionState taskExecutionState) {
+		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
+
+		Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+
+		futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+			@Override
+			public Void apply(Throwable value) {
+				failTask(executionAttemptID, value);
+
+				return null;
+			}
+		}, getMainThreadExecutor());
+	}
+
+	private void unregisterTaskAndNotifyFinalState(final JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
+		Task task = removeTask(executionAttemptID);
+
+		if (task != null) {
+			if (!task.getExecutionState().isTerminal()) {
+				try {
+					task.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
+				} catch (Exception e) {
+					log.error("Could not properly fail task.", e);
+				}
+			}
+
+			log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.",
+				task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId());
+
+			AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
+
+			updateTaskExecutionState(
+				jobMasterGateway,
+				new TaskExecutionState(
+					task.getJobID(),
+					task.getExecutionId(),
+					task.getExecutionState(),
+					task.getFailureCause(),
+					accumulatorSnapshot));
+		} else {
+			log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
+		}
+	}
+
+	private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) {
 		if (resourceManagerConnection != null) {
 			if (newLeaderAddress != null) {
 				// the resource manager switched to a new leader
@@ -178,28 +670,46 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	/**
-	 * Requests a slot from the TaskManager
-	 *
-	 * @param slotID Slot id for the request
-	 * @param allocationID id for the request
-	 * @param resourceManagerLeaderID current leader id of the ResourceManager
-	 * @return answer to the slot request
-	 */
-	@RpcMethod
-	public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) {
-		if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) {
-			return new TMSlotRequestRejected(
-				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
-		}
-		if (unconfirmedFreeSlots.contains(slotID)) {
-			// check if request has not been blacklisted because the notification of a free slot
-			// has not been confirmed by the ResourceManager
-			return new TMSlotRequestRejected(
-				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
-		}
-		return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID);
+	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, int blobPort) {
+		Preconditions.checkNotNull(jobMasterGateway);
+		Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range.");
 
+		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
+
+		CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
+
+		InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+
+		BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration());
+
+		LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
+			blobCache,
+			taskManagerConfiguration.getCleanupInterval());
+
+		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
+			jobMasterGateway,
+			getRpcService().getExecutor(),
+			taskManagerConfiguration.getTimeout());
+
+		PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
+
+		return new JobManagerConnection(
+			jobMasterGateway,
+			taskManagerActions,
+			checkpointResponder,
+			libraryCacheManager,
+			resultPartitionConsumableNotifier,
+			partitionStateChecker);
+	}
+
+	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
+		if (jobManagerConnection != null) {
+			JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
+
+			jobManagerGateway.disconnectTaskManager(getResourceID());
+
+			jobManagerConnection.getLibraryCacheManager().shutdown();
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -237,8 +747,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 * @param t The exception describing the fatal error
 	 */
 	void onFatalError(Throwable t) {
-		// to be determined, probably delegate to a fatal error handler that 
-		// would either log (mini cluster) ot kill the process (yarn, mesos, ...)
+		log.error("Fatal error occurred.", t);
 		fatalErrorHandler.onFatalError(t);
 	}
 
@@ -266,8 +775,13 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
 
 		@Override
-		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-			getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+				}
+			});
 		}
 
 		@Override
@@ -276,4 +790,43 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private class TaskManagerActionsImpl implements TaskManagerActions {
+		private final JobMasterGateway jobMasterGateway;
+
+		private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
+			this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+		}
+
+		@Override
+		public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
+				}
+			});
+		}
+
+		@Override
+		public void notifyFatalError(String message, Throwable cause) {
+			log.error(message, cause);
+			fatalErrorHandler.onFatalError(cause);
+		}
+
+		@Override
+		public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					TaskExecutor.this.failTask(executionAttemptID, cause);
+				}
+			});
+		}
+
+		@Override
+		public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+			TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2360b53..f062b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -21,11 +21,18 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskmanager.Task;
 
+import java.util.Collection;
 import java.util.UUID;
 
 /**
@@ -33,12 +40,6 @@ import java.util.UUID;
  */
 public interface TaskExecutorGateway extends RpcGateway {
 
-	// ------------------------------------------------------------------------
-	//  ResourceManager handlers
-	// ------------------------------------------------------------------------
-
-	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
-
 	/**
 	 * Requests a slot from the TaskManager
 	 *
@@ -52,4 +53,71 @@ public interface TaskExecutorGateway extends RpcGateway {
 		AllocationID allocationID,
 		UUID resourceManagerLeaderID,
 		@RpcTimeout Time timeout);
+
+	/**
+	 * Submit a {@link Task} to the {@link TaskExecutor}.
+	 *
+	 * @param tdd describing the task to submit
+	 * @param jobManagerID identifying the submitting JobManager
+	 * @param timeout of the submit operation
+	 * @return Future acknowledge of the successful operation
+	 */
+	Future<Acknowledge> submitTask(
+		TaskDeploymentDescriptor tdd,
+		ResourceID jobManagerID,
+		@RpcTimeout Time timeout);
+
+	/**
+	 * Update the task where the given partitions can be found.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param partitionInfos telling where the partition can be retrieved from
+	 * @return Future acknowledge if the partitions have been successfully updated
+	 */
+	Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Collection<PartitionInfo> partitionInfos);
+
+	/**
+	 * Fail all intermediate result partitions of the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 */
+	void failPartition(ExecutionAttemptID executionAttemptID);
+
+	/**
+	 * Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID
+	 * and the checkpoint timestamp.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param checkpointID unique id for the checkpoint
+	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+	 * @return Future acknowledge if the checkpoint has been successfully triggered
+	 */
+	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp);
+
+	/**
+	 * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
+	 * and the checkpoint timestamp.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @param checkpointId unique id for the checkpoint
+	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+	 * @return Future acknowledge if the checkpoint has been successfully confirmed
+	 */
+	Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+
+	/**
+	 * Stop the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @return Future acknowledge if the task is successfully stopped
+	 */
+	Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID);
+
+	/**
+	 * Cancel the given task.
+	 *
+	 * @param executionAttemptID identifying the task
+	 * @return Future acknowledge if the task is successfully canceled
+	 */
+	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index f58af77..bce3dc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -71,7 +72,7 @@ public class TaskManagerConfiguration {
 		this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause);
 		this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause);
 		this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval);
-		this.configuration = Preconditions.checkNotNull(configuration);
+		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
 	}
 
 	public int getNumberSlots() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 8ac0ddd..bb66655 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -95,6 +95,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			taskManagerServices.getNetworkEnvironment(),
 			highAvailabilityServices,
 			taskManagerServices.getMetricRegistry(),
+			taskManagerServices.getTaskManagerMetricGroup(),
+			taskManagerServices.getBroadcastVariableManager(),
+			taskManagerServices.getFileCache(),
 			this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ff7f7d5..e264a1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -32,9 +34,11 @@ import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -58,19 +62,28 @@ public class TaskManagerServices {
 	private final IOManager ioManager;
 	private final NetworkEnvironment networkEnvironment;
 	private final MetricRegistry metricRegistry;
+	private final TaskManagerMetricGroup taskManagerMetricGroup;
+	private final BroadcastVariableManager broadcastVariableManager;
+	private final FileCache fileCache;
 
 	private TaskManagerServices(
 		TaskManagerLocation taskManagerLocation,
 		MemoryManager memoryManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
-		MetricRegistry metricRegistry) {
+		MetricRegistry metricRegistry,
+		TaskManagerMetricGroup taskManagerMetricGroup,
+		BroadcastVariableManager broadcastVariableManager,
+		FileCache fileCache) {
 
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.memoryManager = Preconditions.checkNotNull(memoryManager);
 		this.ioManager = Preconditions.checkNotNull(ioManager);
 		this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment);
 		this.metricRegistry = Preconditions.checkNotNull(metricRegistry);
+		this.taskManagerMetricGroup = Preconditions.checkNotNull(taskManagerMetricGroup);
+		this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager);
+		this.fileCache = Preconditions.checkNotNull(fileCache);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -97,6 +110,18 @@ public class TaskManagerServices {
 		return metricRegistry;
 	}
 
+	public TaskManagerMetricGroup getTaskManagerMetricGroup() {
+		return taskManagerMetricGroup;
+	}
+
+	public BroadcastVariableManager getBroadcastVariableManager() {
+		return broadcastVariableManager;
+	}
+
+	public FileCache getFileCache() {
+		return fileCache;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Static factory methods for task manager services
 	// --------------------------------------------------------------------------------------------
@@ -128,9 +153,29 @@ public class TaskManagerServices {
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-		MetricRegistry metricsRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+		final MetricRegistry metricRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration());
+
+		final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
+			metricRegistry,
+			taskManagerLocation.getHostname(),
+			taskManagerLocation.getResourceID().toString());
+
+		// Initialize the TM metrics
+		TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network);
+
+		final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
+
+		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
 
-		return new TaskManagerServices(taskManagerLocation, memoryManager, ioManager, network, metricsRegistry);
+		return new TaskManagerServices(
+			taskManagerLocation,
+			memoryManager,
+			ioManager,
+			network,
+			metricRegistry,
+			taskManagerMetricGroup,
+			broadcastVariableManager,
+			fileCache);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
new file mode 100644
index 0000000..4fc1d66
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link Task} belonging to the same slot.
+ */
+public class TaskSlot {
+	private final AllocationID allocationID;
+	private final ResourceID resourceID;
+	private final Map<ExecutionAttemptID, Task> tasks;
+
+	public TaskSlot(AllocationID allocationID, ResourceID resourceID) {
+		this.allocationID = Preconditions.checkNotNull(allocationID);
+		this.resourceID = Preconditions.checkNotNull(resourceID);
+		tasks = new HashMap<>(4);
+	}
+
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	public boolean add(Task task) {
+		// sanity check
+		Preconditions.checkArgument(allocationID.equals(task.getAllocationID()));
+
+		Task oldTask = tasks.put(task.getExecutionId(), task);
+
+		if (oldTask != null) {
+			tasks.put(task.getExecutionId(), oldTask);
+			return false;
+		} else {
+			return true;
+		}
+	}
+
+	public Task remove(Task task) {
+		return tasks.remove(task.getExecutionId());
+	}
+
+	public void clear() {
+		tasks.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c656d92/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
new file mode 100644
index 0000000..e67fd52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Mapping between a {@link Task} and its {@link TaskSlot}.
+ */
+public class TaskSlotMapping {
+
+	private final Task task;
+	private final TaskSlot taskSlot;
+
+	public TaskSlotMapping(Task task, TaskSlot taskSlot) {
+		this.task = Preconditions.checkNotNull(task);
+		this.taskSlot = Preconditions.checkNotNull(taskSlot);
+	}
+
+	public Task getTask() {
+		return task;
+	}
+
+	public TaskSlot getTaskSlot() {
+		return taskSlot;
+	}
+}