You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:08 UTC

[28/50] [abbrv] flink git commit: [FLINK-4738] [TaskManager] Port TaskManager logic to new Flip-6 TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
new file mode 100644
index 0000000..80f2aa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with checkpointing on the {@link TaskExecutor} side.
+ */
+public class CheckpointException extends TaskManagerException {
+
+	private static final long serialVersionUID = 3366394086880327955L;
+
+	public CheckpointException(String message) {
+		super(message);
+	}
+
+	public CheckpointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public CheckpointException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
new file mode 100644
index 0000000..eecd0ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a problem with the result partitions on the {@link TaskExecutor} side.
+ */
+public class PartitionException extends TaskManagerException {
+
+	private static final long serialVersionUID = 6248696963418276618L;
+
+	public PartitionException(String message) {
+		super(message);
+	}
+
+	public PartitionException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public PartitionException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
new file mode 100644
index 0000000..a4a89c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception indicating a task related problem on the {@link TaskExecutor}.
+ */
+public class TaskException extends TaskManagerException {
+
+	private static final long serialVersionUID = 968001398103156856L;
+
+	public TaskException(String message) {
+		super(message);
+	}
+
+	public TaskException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public TaskException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
new file mode 100644
index 0000000..62d186e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Base exception thrown by the {@link TaskExecutor}.
+ */
+public class TaskManagerException extends Exception {
+
+	private static final long serialVersionUID = -2997745772227694731L;
+
+	public TaskManagerException(String message) {
+		super(message);
+	}
+
+	public TaskManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public TaskManagerException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
new file mode 100644
index 0000000..23f7812
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskmanager.Task;
+
+/**
+ * Exception indicating a problem with the {@link Task} submission at the {@link TaskException}.
+ */
+public class TaskSubmissionException extends TaskManagerException {
+
+	private static final long serialVersionUID = 4589813591317690486L;
+
+	public TaskSubmissionException(String message) {
+		super(message);
+	}
+
+	public TaskSubmissionException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public TaskSubmissionException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
new file mode 100644
index 0000000..246c11d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.util.Preconditions;
+
+public class RpcCheckpointResponder implements CheckpointResponder {
+
+	private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
+
+	public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) {
+		this.checkpointCoordinatorGateway = Preconditions.checkNotNull(checkpointCoordinatorGateway);
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(
+			JobID jobID,
+			ExecutionAttemptID executionAttemptID,
+			CheckpointMetaData checkpointMetaData,
+			CheckpointStateHandles checkpointStateHandles) {
+
+		checkpointCoordinatorGateway.acknowledgeCheckpoint(
+			jobID,
+			executionAttemptID,
+			checkpointMetaData,
+			checkpointStateHandles);
+
+	}
+
+	@Override
+	public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointMetaData checkpoint) {
+		checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpoint);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
new file mode 100644
index 0000000..4850d63
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+public class RpcInputSplitProvider implements InputSplitProvider {
+	private final JobMasterGateway jobMasterGateway;
+	private final JobID jobID;
+	private final JobVertexID jobVertexID;
+	private final ExecutionAttemptID executionAttemptID;
+	private final Time timeout;
+
+	public RpcInputSplitProvider(
+			JobMasterGateway jobMasterGateway,
+			JobID jobID,
+			JobVertexID jobVertexID,
+			ExecutionAttemptID executionAttemptID,
+			Time timeout) {
+		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
+		this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+
+	@Override
+	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
+		Preconditions.checkNotNull(userCodeClassLoader);
+
+		Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+
+		try {
+			SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());
+
+			if (serializedInputSplit.isEmpty()) {
+				return null;
+			} else {
+				return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
+			}
+		} catch (Exception e) {
+			throw new InputSplitProviderException("Requesting the next input split failed.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
new file mode 100644
index 0000000..3692a71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistryGateway;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.Preconditions;
+
+public class RpcKvStateRegistryListener implements KvStateRegistryListener {
+
+	private final KvStateRegistryGateway kvStateRegistryGateway;
+	private final KvStateServerAddress kvStateServerAddress;
+
+	public RpcKvStateRegistryListener(
+			KvStateRegistryGateway kvStateRegistryGateway,
+			KvStateServerAddress kvStateServerAddress) {
+		this.kvStateRegistryGateway = Preconditions.checkNotNull(kvStateRegistryGateway);
+		this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress);
+	}
+
+	@Override
+	public void notifyKvStateRegistered(
+			JobID jobId,
+			JobVertexID jobVertexId,
+			KeyGroupRange keyGroupRange,
+			String registrationName,
+			KvStateID kvStateId) {
+		kvStateRegistryGateway.notifyKvStateRegistered(
+			jobId,
+			jobVertexId,
+			keyGroupRange,
+			registrationName,
+			kvStateId,
+			kvStateServerAddress);
+
+	}
+
+	@Override
+	public void notifyKvStateUnregistered(
+		JobID jobId,
+		JobVertexID jobVertexId,
+		KeyGroupRange keyGroupRange,
+		String registrationName) {
+
+		kvStateRegistryGateway.notifyKvStateUnregistered(
+			jobId,
+			jobVertexId,
+			keyGroupRange,
+			registrationName);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
new file mode 100644
index 0000000..ab111ad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
+
+public class RpcPartitionStateChecker implements PartitionStateChecker {
+
+	private final JobMasterGateway jobMasterGateway;
+
+	public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) {
+		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+	}
+
+	@Override
+	public Future<PartitionState> requestPartitionState(
+		JobID jobId,
+		ExecutionAttemptID executionId,
+		IntermediateDataSetID resultId,
+		ResultPartitionID partitionId) {
+
+		return jobMasterGateway.requestPartitionState(partitionId, executionId, resultId);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..29ad3b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.rpc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class);
+
+	private final JobMasterGateway jobMasterGateway;
+	private final Executor executor;
+	private final Time timeout;
+
+	public RpcResultPartitionConsumableNotifier(
+			JobMasterGateway jobMasterGateway,
+			Executor executor,
+			Time timeout) {
+		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+		this.executor = Preconditions.checkNotNull(executor);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+	@Override
+	public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
+		Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
+
+		acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+			@Override
+			public Void apply(Throwable value) {
+				LOG.error("Could not schedule or update consumers at the JobManager.", value);
+
+				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
+
+				return null;
+			}
+		}, executor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
new file mode 100644
index 0000000..1f8d5ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.utils;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ClassLoadingMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+
+/**
+ * Utility class ot initialize {@link TaskExecutor} specific metrics.
+ */
+public class TaskExecutorMetricsInitializer {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class);
+
+	public static void instantiateStatusMetrics(
+		MetricGroup taskManagerMetricGroup,
+		NetworkEnvironment network) {
+		MetricGroup status = taskManagerMetricGroup.addGroup("Status");
+
+		instantiateNetworkMetrics(status.addGroup("Network"), network);
+
+		MetricGroup jvm = status.addGroup("JVM");
+
+		instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
+		instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
+		instantiateMemoryMetrics(jvm.addGroup("Memory"));
+		instantiateThreadMetrics(jvm.addGroup("Threads"));
+		instantiateCPUMetrics(jvm.addGroup("CPU"));
+	}
+
+	private static void instantiateNetworkMetrics(
+		MetricGroup metrics,
+		final NetworkEnvironment network) {
+		metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
+			}
+		});
+
+		metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
+			}
+		});
+	}
+
+	private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
+		final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
+
+		metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getTotalLoadedClassCount();
+			}
+		});
+
+		metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getUnloadedClassCount();
+			}
+		});
+	}
+
+	private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) {
+		List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans();
+
+		for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) {
+			MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName());
+
+			gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () {
+				@Override
+				public Long getValue() {
+					return garbageCollector.getCollectionCount();
+				}
+			});
+
+			gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
+				@Override
+				public Long getValue() {
+					return garbageCollector.getCollectionTime();
+				}
+			});
+		}
+	}
+
+	private static void instantiateMemoryMetrics(MetricGroup metrics) {
+		final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+
+		MetricGroup heap = metrics.addGroup("Heap");
+
+		heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getUsed();
+			}
+		});
+		heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getCommitted();
+			}
+		});
+		heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getHeapMemoryUsage().getMax();
+			}
+		});
+
+		MetricGroup nonHeap = metrics.addGroup("NonHeap");
+
+		nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getUsed();
+			}
+		});
+		nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getCommitted();
+			}
+		});
+		nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () {
+			@Override
+			public Long getValue() {
+				return mxBean.getNonHeapMemoryUsage().getMax();
+			}
+		});
+
+		final MBeanServer con = ManagementFactory.getPlatformMBeanServer();
+
+		final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";
+
+		try {
+			final ObjectName directObjectName = new ObjectName(directBufferPoolName);
+
+			MetricGroup direct = metrics.addGroup("Direct");
+
+			direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L));
+			direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L));
+			direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L));
+		} catch (MalformedObjectNameException e) {
+			LOG.warn("Could not create object name {}.", directBufferPoolName, e);
+		}
+
+		final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped";
+
+		try {
+			final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName);
+
+			MetricGroup mapped = metrics.addGroup("Mapped");
+
+			mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L));
+			mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L));
+			mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L));
+		} catch (MalformedObjectNameException e) {
+			LOG.warn("Could not create object name {}.", mappedBufferPoolName, e);
+		}
+	}
+
+	private static void instantiateThreadMetrics(MetricGroup metrics) {
+		final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean();
+
+		metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () {
+			@Override
+			public Integer getValue() {
+				return mxBean.getThreadCount();
+			}
+		});
+	}
+
+	private static void instantiateCPUMetrics(MetricGroup metrics) {
+		try {
+			final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+
+			metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () {
+				@Override
+				public Double getValue() {
+					return mxBean.getProcessCpuLoad();
+				}
+			});
+			metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () {
+				@Override
+				public Long getValue() {
+					return mxBean.getProcessCpuTime();
+				}
+			});
+		} catch (Exception e) {
+			LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+				" - CPU load metrics will not be available.", e);
+		}
+	}
+
+	private static final class AttributeGauge<T> implements Gauge<T> {
+		private final MBeanServer server;
+		private final ObjectName objectName;
+		private final String attributeName;
+		private final T errorValue;
+
+		private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) {
+			this.server = Preconditions.checkNotNull(server);
+			this.objectName = Preconditions.checkNotNull(objectName);
+			this.attributeName = Preconditions.checkNotNull(attributeName);
+			this.errorValue = errorValue;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public T getValue() {
+			try {
+				return (T) server.getAttribute(objectName, attributeName);
+			} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+				LOG.warn("Could not read attribute {}.", attributeName, e);
+				return errorValue;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
new file mode 100644
index 0000000..b3a0cbb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Implementation using {@link ActorGateway} to forward the messages.
+ */
+public class ActorGatewayTaskManagerActions implements TaskManagerActions {
+
+	private final ActorGateway actorGateway;
+
+	public ActorGatewayTaskManagerActions(ActorGateway actorGateway) {
+		this.actorGateway = Preconditions.checkNotNull(actorGateway);
+	}
+
+	@Override
+	public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
+		actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID));
+	}
+
+	@Override
+	public void notifyFatalError(String message, Throwable cause) {
+		actorGateway.tell(new TaskManagerMessages.FatalError(message, cause));
+	}
+
+	@Override
+	public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
+		actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause));
+	}
+
+	@Override
+	public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
+
+		actorGateway.tell(actorMessage);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
deleted file mode 100644
index cddac55..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskManagerConnection implements TaskManagerConnection {
-
-	private final ActorGateway actorGateway;
-
-	public ActorGatewayTaskManagerConnection(ActorGateway actorGateway) {
-		this.actorGateway = Preconditions.checkNotNull(actorGateway);
-	}
-
-	@Override
-	public void notifyFinalState(ExecutionAttemptID executionAttemptID) {
-		actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID));
-	}
-
-	@Override
-	public void notifyFatalError(String message, Throwable cause) {
-		actorGateway.tell(new TaskManagerMessages.FatalError(message, cause));
-	}
-
-	@Override
-	public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) {
-		actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause));
-	}
-
-	@Override
-	public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-		TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-
-		actorGateway.tell(actorMessage);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index bd522bd..37ac0a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -126,6 +127,9 @@ public class Task implements Runnable, TaskActions {
 	/** The execution attempt of the parallel subtask */
 	private final ExecutionAttemptID executionId;
 
+	/** ID which identifies the slot in which the task is supposed to run */
+	private final AllocationID allocationID;
+
 	/** TaskInfo object for this task */
 	private final TaskInfo taskInfo;
 
@@ -171,7 +175,7 @@ public class Task implements Runnable, TaskActions {
 	private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
 
 	/** Connection to the task manager */
-	private final TaskManagerConnection taskManagerConnection;
+	private final TaskManagerActions taskManagerActions;
 
 	/** Input split provider for the task */
 	private final InputSplitProvider inputSplitProvider;
@@ -246,7 +250,7 @@ public class Task implements Runnable, TaskActions {
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
 		BroadcastVariableManager bcVarManager,
-		TaskManagerConnection taskManagerConnection,
+		TaskManagerActions taskManagerActions,
 		InputSplitProvider inputSplitProvider,
 		CheckpointResponder checkpointResponder,
 		LibraryCacheManager libraryCache,
@@ -261,6 +265,7 @@ public class Task implements Runnable, TaskActions {
 		this.jobId = checkNotNull(tdd.getJobID());
 		this.vertexId = checkNotNull(tdd.getVertexID());
 		this.executionId  = checkNotNull(tdd.getExecutionId());
+		this.allocationID = checkNotNull(tdd.getAllocationID());
 		this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
 		this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
 		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
@@ -281,7 +286,7 @@ public class Task implements Runnable, TaskActions {
 
 		this.inputSplitProvider = checkNotNull(inputSplitProvider);
 		this.checkpointResponder = checkNotNull(checkpointResponder);
-		this.taskManagerConnection = checkNotNull(taskManagerConnection);
+		this.taskManagerActions = checkNotNull(taskManagerActions);
 
 		this.libraryCache = checkNotNull(libraryCache);
 		this.fileCache = checkNotNull(fileCache);
@@ -365,6 +370,10 @@ public class Task implements Runnable, TaskActions {
 		return executionId;
 	}
 
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+
 	public TaskInfo getTaskInfo() {
 		return taskInfo;
 	}
@@ -584,7 +593,7 @@ public class Task implements Runnable, TaskActions {
 
 			// notify everyone that we switched to running
 			notifyObservers(ExecutionState.RUNNING, null);
-			taskManagerConnection.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
+			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
 
 			// make sure the user code classloader is accessible thread-locally
 			executingThread.setContextClassLoader(userCodeClassLoader);
@@ -777,11 +786,11 @@ public class Task implements Runnable, TaskActions {
 	}
 
 	private void notifyFinalState() {
-		taskManagerConnection.notifyFinalState(executionId);
+		taskManagerActions.notifyFinalState(executionId);
 	}
 
 	private void notifyFatalError(String message, Throwable cause) {
-		taskManagerConnection.notifyFatalError(message, cause);
+		taskManagerActions.notifyFatalError(message, cause);
 	}
 
 	// ----------------------------------------------------------------------------------------------------------------
@@ -807,7 +816,7 @@ public class Task implements Runnable, TaskActions {
 						((StoppableTask)Task.this.invokable).stop();
 					} catch(RuntimeException e) {
 						LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e);
-						taskManagerConnection.failTask(executionId, e);
+						taskManagerActions.failTask(executionId, e);
 					}
 				}
 			};

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 60aadf5..877cc1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.SerializedThrowable;
 
+import java.io.Serializable;
+
 /**
  * This class represents an update about a task's execution state.
  *
@@ -34,7 +36,7 @@ import org.apache.flink.runtime.util.SerializedThrowable;
  * exception field transient and deserialized it lazily, with the
  * appropriate class loader.
  */
-public class TaskExecutionState implements java.io.Serializable {
+public class TaskExecutionState implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 60beae0..31c518a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -63,35 +64,45 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	}
 
 	@Override
-	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) {
+	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
 		Preconditions.checkNotNull(userCodeClassLoader);
 
+		final Future<Object> response = jobManager.ask(
+			new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
+			timeout);
+
+		final Object result;
+
 		try {
-			final Future<Object> response = jobManager.ask(
-					new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID),
-					timeout);
+			result = Await.result(response, timeout);
+		} catch (Exception e) {
+			throw new InputSplitProviderException("Did not receive next input split from JobManager.", e);
+		}
 
-			final Object result = Await.result(response, timeout);
+		if(result instanceof JobManagerMessages.NextInputSplit){
+			final JobManagerMessages.NextInputSplit nextInputSplit =
+				(JobManagerMessages.NextInputSplit) result;
 
-			if(result instanceof JobManagerMessages.NextInputSplit){
-				final JobManagerMessages.NextInputSplit nextInputSplit =
-					(JobManagerMessages.NextInputSplit) result;
+			byte[] serializedData = nextInputSplit.splitData();
 
-				byte[] serializedData = nextInputSplit.splitData();
+			if(serializedData == null) {
+				return null;
+			} else {
+				final Object deserialized;
 
-				if(serializedData == null) {
-					return null;
-				} else {
-					Object deserialized = InstantiationUtil.deserializeObject(serializedData,
+				try {
+					deserialized = InstantiationUtil.deserializeObject(serializedData,
 						userCodeClassLoader);
-					return (InputSplit) deserialized;
+				} catch (Exception e) {
+					throw new InputSplitProviderException("Could not deserialize the serialized input split.", e);
 				}
-			} else {
-				throw new Exception("RequestNextInputSplit requires a response of type " +
-					"NextInputSplit. Instead response is of type " + result.getClass() + '.');
+
+				return (InputSplit) deserialized;
 			}
-		} catch (Exception e) {
-			throw new RuntimeException("Requesting the next InputSplit failed.", e);
+		} else {
+			throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " +
+				"NextInputSplit. Instead response is of type " + result.getClass() + '.');
 		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
new file mode 100644
index 0000000..2f3a0cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+/**
+ * Interface for the communication of the {@link Task} with the {@link TaskManager}.
+ */
+public interface TaskManagerActions {
+
+	/**
+	 * Notifies the task manager that the given task is in a final state.
+	 *
+	 * @param executionAttemptID Execution attempt ID of the task
+	 */
+	void notifyFinalState(ExecutionAttemptID executionAttemptID);
+
+	/**
+	 * Notifies the task manager about a fatal error occurred in the task.
+	 *
+	 * @param message Message to report
+	 * @param cause Cause of the fatal error
+	 */
+	void notifyFatalError(String message, Throwable cause);
+
+	/**
+	 * Tells the task manager to fail the given task.
+	 *
+	 * @param executionAttemptID Execution attempt ID of the task to fail
+	 * @param cause Cause of the failure
+	 */
+	void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
+
+	/**
+	 * Notifies the task manager about the task execution state update.
+	 *
+	 * @param taskExecutionState Task execution state update
+	 */
+	void updateTaskExecutionState(TaskExecutionState taskExecutionState);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
deleted file mode 100644
index dc1b40f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-/**
- * Interface for the communication of the {@link Task} with the {@link TaskManager}.
- */
-public interface TaskManagerConnection {
-
-	/**
-	 * Notifies the task manager that the given task is in a final state.
-	 *
-	 * @param executionAttemptID Execution attempt ID of the task
-	 */
-	void notifyFinalState(ExecutionAttemptID executionAttemptID);
-
-	/**
-	 * Notifies the task manager about a fatal error occurred in the task.
-	 *
-	 * @param message Message to report
-	 * @param cause Cause of the fatal error
-	 */
-	void notifyFatalError(String message, Throwable cause);
-
-	/**
-	 * Tells the task manager to fail the given task.
-	 *
-	 * @param executionAttemptID Execution attempt ID of the task to fail
-	 * @param cause Cause of the failure
-	 */
-	void failTask(ExecutionAttemptID executionAttemptID, Throwable cause);
-
-	/**
-	 * Notifies the task manager about the task execution state update.
-	 *
-	 * @param taskExecutionState Task execution state update
-	 */
-	void updateTaskExecutionState(TaskExecutionState taskExecutionState);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7501da7..7364ee0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -25,7 +25,6 @@ import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
 import java.util.concurrent.TimeUnit
-import javax.management.ObjectName
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
@@ -38,7 +37,6 @@ import grizzled.slf4j.Logger
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -60,7 +58,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
@@ -69,7 +67,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
 import org.apache.flink.runtime.security.SecurityContext
-import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor._
+import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.NetUtils
@@ -151,7 +150,7 @@ class TaskManager(
   protected val bcVarManager = new BroadcastVariableManager()
 
   /** Handler for distributed files cached by this TaskManager */
-  protected val fileCache = new FileCache(config.getConfiguration())
+  protected val fileCache = new FileCache(config.getTmpDirPaths())
 
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
@@ -197,7 +196,7 @@ class TaskManager(
     CheckpointResponder,
     PartitionStateChecker,
     ResultPartitionConsumableNotifier,
-    TaskManagerConnection)] = None
+    TaskManagerActions)] = None
 
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
@@ -941,9 +940,9 @@ class TaskManager(
     val jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID.orNull)
     val taskManagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
 
-    val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway);
+    val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway)
 
-    val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway)
+    val taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway)
 
     val partitionStateChecker = new ActorGatewayPartitionStateChecker(
       jobManagerGateway,
@@ -999,7 +998,7 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
+    TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2017,23 +2016,23 @@ object TaskManager {
 
     // Pre-processing steps for registering cpuLoad
     val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
-        
-    val fetchCPULoadMethod: Option[Method] = 
+
+    val fetchCPULoadMethod: Option[Method] =
       try {
         Class.forName("com.sun.management.OperatingSystemMXBean")
           .getMethods()
-          .find( _.getName() == "getProcessCpuLoad" )
+          .find(_.getName() == "getProcessCpuLoad")
       }
       catch {
         case t: Throwable =>
           LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-            " - CPU load metrics will not be available.")
+                     " - CPU load metrics will not be available.")
           None
       }
 
     metricRegistry.register("cpuLoad", new Gauge[Double] {
       override def getValue: Double = {
-        try{
+        try {
           fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
         }
         catch {
@@ -2045,146 +2044,4 @@ object TaskManager {
     })
     metricRegistry
   }
-
-  private def instantiateStatusMetrics(
-      taskManagerMetricGroup: MetricGroup,
-      network: NetworkEnvironment)
-    : Unit = {
-    val status = taskManagerMetricGroup
-      .addGroup("Status")
-
-    instantiateNetworkMetrics(status.addGroup("Network"), network)
-
-    val jvm = status
-      .addGroup("JVM")
-
-    instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
-    instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"))
-    instantiateMemoryMetrics(jvm.addGroup("Memory"))
-    instantiateThreadMetrics(jvm.addGroup("Threads"))
-    instantiateCPUMetrics(jvm.addGroup("CPU"))
-  }
-
-  private def instantiateNetworkMetrics(
-        metrics: MetricGroup,
-        network: NetworkEnvironment)
-    : Unit = {
-    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
-      override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
-      override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
-    })
-  }
-
-  private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getClassLoadingMXBean
-
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getTotalLoadedClassCount
-    })
-    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getUnloadedClassCount
-    })
-  }
-
-  private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) {
-    val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans
-
-    for (garbageCollector <- garbageCollectors.asScala) {
-      val gcGroup = metrics.addGroup(garbageCollector.getName)
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionCount
-      })
-      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-        override def getValue: Long = garbageCollector.getCollectionTime
-      })
-    }
-  }
-
-  private def instantiateMemoryMetrics(metrics: MetricGroup) {
-    val mxBean = ManagementFactory.getMemoryMXBean
-    val heap = metrics.addGroup("Heap")
-    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
-    })
-    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
-    })
-
-    val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
-    })
-    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
-      override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
-    })
-
-    val con = ManagementFactory.getPlatformMBeanServer;
-
-    val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct")
-
-    val direct = metrics.addGroup("Direct")
-    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "Count").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-
-    val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped")
-
-    val mapped = metrics.addGroup("Mapped")
-    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
-    })
-    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] {
-      override def getValue: Long = con
-        .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
-    })
-  }
-
-  private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
-    val mxBean = ManagementFactory.getThreadMXBean
-
-    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
-      override def getValue: Int = mxBean.getThreadCount
-    })
-  }
-
-  private def instantiateCPUMetrics(metrics: MetricGroup): Unit = {
-    try {
-      val mxBean = ManagementFactory.getOperatingSystemMXBean
-        .asInstanceOf[com.sun.management.OperatingSystemMXBean]
-
-      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] {
-          override def getValue: Double = mxBean.getProcessCpuLoad
-        })
-      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
-          override def getValue: Long = mxBean.getProcessCpuTime
-        })
-    }
-    catch {
-     case t: Throwable =>
-       LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
-        " - CPU load metrics will not be available.") 
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index c369674..4db0d93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.filecache;
 import java.io.File;
 import java.util.concurrent.Future;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.JobID;
@@ -62,8 +61,9 @@ public class FileCacheDeleteValidationTest {
 	
 	@Before
 	public void setup() {
+		String[] tmpDirectories = System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator);
 		try {
-			fileCache = new FileCache(new Configuration());
+			fileCache = new FileCache(tmpDirectories);
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index f8a0b6a..30dfef5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -42,7 +42,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 7710fa9..f5fe52c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -29,6 +32,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -38,7 +42,6 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import org.powermock.api.mockito.PowerMockito;
@@ -60,10 +63,14 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
+
 			rpc.registerGateway(resourceManagerAddress, rmGateway);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
 
@@ -76,6 +83,9 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -113,9 +123,12 @@ public class TaskExecutorTest extends TestLogger {
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
@@ -126,6 +139,9 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();
@@ -182,9 +198,12 @@ public class TaskExecutorTest extends TestLogger {
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
 			PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+			PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+			PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]);
 
 			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
@@ -195,6 +214,9 @@ public class TaskExecutorTest extends TestLogger {
 				mock(NetworkEnvironment.class),
 				haServices,
 				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
 				mock(FatalErrorHandler.class));
 
 			taskManager.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7dd67ed..3a87d86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -174,7 +174,7 @@ public class TaskAsyncCallTest {
 			mock(IOManager.class),
 			networkEnvironment,
 			mock(BroadcastVariableManager.class),
-			mock(TaskManagerConnection.class),
+			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
 			libCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
index 642300d..777633d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.BaseTestingActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
@@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 public class TaskInputSplitProviderTest {
 
 	@Test
-	public void testRequestNextInputSplitWithInvalidExecutionID() {
+	public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException {
 
 		final JobID jobID = new JobID();
 		final JobVertexID vertexID = new JobVertexID();

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 9791cee..5d3eb3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -68,6 +69,7 @@ public class TaskStopTest {
 		when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
 		when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
 		when(tddMock.getInvokableClassName()).thenReturn("className");
+		when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
 
 		task = new Task(
 			tddMock,
@@ -75,7 +77,7 @@ public class TaskStopTest {
 			mock(IOManager.class),
 			mock(NetworkEnvironment.class),
 			mock(BroadcastVariableManager.class),
-			mock(TaskManagerConnection.class),
+			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
 			mock(LibraryCacheManager.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 9a13cde..fe618ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -97,7 +97,7 @@ public class TaskTest {
 	private ActorGateway listenerGateway;
 
 	private ActorGatewayTaskExecutionStateListener listener;
-	private ActorGatewayTaskManagerConnection taskManagerConnection;
+	private ActorGatewayTaskManagerActions taskManagerConnection;
 
 	private BlockingQueue<Object> taskManagerMessages;
 	private BlockingQueue<Object> jobManagerMessages;
@@ -113,7 +113,7 @@ public class TaskTest {
 		listenerGateway = new ForwardingActorGateway(listenerMessages);
 
 		listener = new ActorGatewayTaskExecutionStateListener(listenerGateway);
-		taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway);
+		taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway);
 		
 		awaitLatch = new OneShotLatch();
 		triggerLatch = new OneShotLatch();

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index 343affe..c067ca7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import java.util.Iterator;
@@ -146,7 +147,12 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 					return true;
 				}
 
-				InputSplit split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+				final InputSplit split;
+				try {
+					split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader());
+				} catch (InputSplitProviderException e) {
+					throw new RuntimeException("Could not retrieve next input split.", e);
+				}
 
 				if (split != null) {
 					this.nextSplit = split;

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index ee5a203..861665f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -60,6 +60,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.io.EOFException;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URL;
@@ -161,17 +162,19 @@ public class InterruptSensitiveRestoreTest {
 		when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class)))
 				.thenReturn(mock(TaskKvStateRegistry.class));
 
+		String[] tmpDirectories = EnvironmentInformation.getTemporaryFileDirectory().split(",|" + File.pathSeparator);
+
 		return new Task(
 				tdd,
 				mock(MemoryManager.class),
 				mock(IOManager.class),
 				networkEnvironment,
 				mock(BroadcastVariableManager.class),
-				mock(TaskManagerConnection.class),
+				mock(TaskManagerActions.class),
 				mock(InputSplitProvider.class),
 				mock(CheckpointResponder.class),
 				new FallbackLibraryCacheManager(),
-				new FileCache(new Configuration()),
+				new FileCache(tmpDirectories),
 				new TaskManagerRuntimeInfo(
 						"localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
 				new UnregisteredTaskMetricsGroup(),
@@ -274,4 +277,4 @@ public class InterruptSensitiveRestoreTest {
 			fail("should never be called");
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/52755860/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 94f6d5a..ee0839f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -238,7 +238,7 @@ public class StreamTaskTest {
 			mock(IOManager.class),
 			network,
 			mock(BroadcastVariableManager.class),
-			mock(TaskManagerConnection.class),
+			mock(TaskManagerActions.class),
 			mock(InputSplitProvider.class),
 			mock(CheckpointResponder.class),
 			libCache,