You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:07:54 UTC

[2/3] flink git commit: [FLINK-8078] Introduce LogicalSlot interface

[FLINK-8078] Introduce LogicalSlot interface

The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.

This closes #5086.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb9c64b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb9c64b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb9c64b1

Branch: refs/heads/master
Commit: bb9c64b1222a5e9568cf93186a6420bebcb306f9
Parents: 0d55164
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 14 23:50:52 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:37:32 2017 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |   9 +-
 .../flink/runtime/executiongraph/Execution.java |  62 ++---
 .../runtime/executiongraph/ExecutionVertex.java |  15 +-
 .../flink/runtime/instance/LogicalSlot.java     |  93 ++++++++
 .../flink/runtime/instance/SimpleSlot.java      |  16 +-
 .../apache/flink/runtime/instance/SlotPool.java |  20 +-
 .../flink/runtime/instance/SlotPoolGateway.java |   2 +-
 .../flink/runtime/instance/SlotProvider.java    |   2 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  13 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   4 +-
 .../InputChannelDeploymentDescriptorTest.java   |  19 +-
 .../ExecutionGraphDeploymentTest.java           |   5 +-
 .../ExecutionGraphMetricsTest.java              |  53 +----
 .../ExecutionGraphSchedulingTest.java           |  17 +-
 .../runtime/executiongraph/ExecutionTest.java   |   3 +-
 .../ExecutionVertexDeploymentTest.java          |  10 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  18 +-
 .../utils/SimpleSlotProvider.java               |   3 +-
 .../flink/runtime/instance/SharedSlotsTest.java |   3 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |  16 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |  10 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  48 ++--
 .../runtime/instance/TestingLogicalSlot.java    | 114 ++++++++++
 .../PartialConsumePipelinedResultTest.java      |   4 +-
 .../ScheduleOrUpdateConsumersTest.java          |   3 +-
 .../ScheduleWithCoLocationHintTest.java         | 178 +++++++--------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  85 ++++---
 .../scheduler/SchedulerSlotSharingTest.java     | 226 +++++++++----------
 28 files changed, 614 insertions(+), 437 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index fe1c599..8d76207 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -85,10 +85,9 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 	 */
 	public static InputChannelDeploymentDescriptor[] fromEdges(
 			ExecutionEdge[] edges,
-			SimpleSlot consumerSlot,
+			ResourceID consumerResourceId,
 			boolean allowLazyDeployment) throws ExecutionGraphException {
 
-		final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
 		final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
 
 		// Each edge is connected to a different result partition
@@ -97,7 +96,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 			final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
 
 			final ExecutionState producerState = producer.getState();
-			final SimpleSlot producerSlot = producer.getAssignedResource();
+			final LogicalSlot producerSlot = producer.getAssignedResource();
 
 			final ResultPartitionLocation partitionLocation;
 
@@ -111,7 +110,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 				final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
 				final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
 
-				if (partitionTaskManager.equals(consumerTaskManager)) {
+				if (partitionTaskManager.equals(consumerResourceId)) {
 					// Consuming task is deployed to the same TaskManager as the partition => local
 					partitionLocation = ResultPartitionLocation.createLocal();
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 38c3821..00a452d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -98,9 +98,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
 
-	private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+	private static final AtomicReferenceFieldUpdater<Execution, LogicalSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
 		Execution.class,
-		SimpleSlot.class,
+		LogicalSlot.class,
 		"assignedResource");
 
 	private static final Logger LOG = ExecutionGraph.LOG;
@@ -141,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile ExecutionState state = CREATED;
 
-	private volatile SimpleSlot assignedResource;
+	private volatile LogicalSlot assignedResource;
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
@@ -240,7 +240,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return taskManagerLocationFuture;
 	}
 
-	public SimpleSlot getAssignedResource() {
+	public LogicalSlot getAssignedResource() {
 		return assignedResource;
 	}
 
@@ -248,21 +248,21 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Tries to assign the given slot to the execution. The assignment works only if the
 	 * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
 	 *
-	 * @param slot to assign to this execution
+	 * @param logicalSlot to assign to this execution
 	 * @return true if the slot could be assigned to the execution, otherwise false
 	 */
 	@VisibleForTesting
-	boolean tryAssignResource(final SimpleSlot slot) {
-		checkNotNull(slot);
+	boolean tryAssignResource(final LogicalSlot logicalSlot) {
+		checkNotNull(logicalSlot);
 
 		// only allow to set the assigned resource in state SCHEDULED or CREATED
 		// note: we also accept resource assignment when being in state CREATED for testing purposes
 		if (state == SCHEDULED || state == CREATED) {
-			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) {
+			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) {
 				// check for concurrent modification (e.g. cancelling call)
 				if (state == SCHEDULED || state == CREATED) {
 					checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
-					taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
+					taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
 
 					return true;
 				} else {
@@ -283,7 +283,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
-		final SimpleSlot currentAssignedResource = assignedResource;
+		final LogicalSlot currentAssignedResource = assignedResource;
 		return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
 	}
 
@@ -442,14 +442,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 							queued,
 							preferredLocations))
 				.thenApply(
-					(SimpleSlot slot) -> {
-						if (tryAssignResource(slot)) {
+					(LogicalSlot logicalSlot) -> {
+						if (tryAssignResource(logicalSlot)) {
 							return this;
 						} else {
 							// release the slot
-							slot.releaseSlot();
+							logicalSlot.releaseSlot();
 
-							throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+							throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
 						}
 					});
 		}
@@ -465,7 +465,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @throws JobException if the execution cannot be deployed to the assigned resource
 	 */
 	public void deploy() throws JobException {
-		final SimpleSlot slot  = assignedResource;
+		final LogicalSlot slot  = assignedResource;
 
 		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
 
@@ -493,7 +493,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 		try {
 			// good, we are allowed to deploy
-			if (!slot.setExecutedVertex(this)) {
+			if (!slot.setExecution(this)) {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 
@@ -545,7 +545,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -608,7 +608,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
 
-						final SimpleSlot slot = assignedResource;
+						final LogicalSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -691,7 +691,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// ----------------------------------------------------------------
 			else {
 				if (consumerState == RUNNING) {
-					final SimpleSlot consumerSlot = consumer.getAssignedResource();
+					final LogicalSlot consumerSlot = consumer.getAssignedResource();
 
 					if (consumerSlot == null) {
 						// The consumer has been reset concurrently
@@ -702,7 +702,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 							.getCurrentAssignedResource().getTaskManagerLocation();
 					final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
 					
-					final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
+					final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();
 
 					final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
 					
@@ -778,7 +778,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			int maxStrackTraceDepth,
 			Time timeout) {
 
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -802,7 +802,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timestamp of the completed checkpoint
 	 */
 	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -822,7 +822,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -880,7 +880,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						final SimpleSlot slot = assignedResource;
+						final LogicalSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -938,7 +938,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						final SimpleSlot slot = assignedResource;
+						final LogicalSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -1035,7 +1035,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					final SimpleSlot slot = assignedResource;
+					final LogicalSlot slot = assignedResource;
 					if (slot != null) {
 						slot.releaseSlot();
 					}
@@ -1119,7 +1119,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1140,7 +1140,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1158,7 +1158,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private void sendUpdatePartitionInfoRpcCall(
 			final Iterable<PartitionInfo> partitionInfos) {
 
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1318,7 +1318,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
-		final SimpleSlot slot = assignedResource;
+		final LogicalSlot slot = assignedResource;
 
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
 				(slot == null ? "(unassigned)" : slot), state);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6d45d06..c2c986f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -272,7 +273,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return currentExecution.getTaskManagerLocationFuture();
 	}
 
-	public SimpleSlot getCurrentAssignedResource() {
+	public LogicalSlot getCurrentAssignedResource() {
 		return currentExecution.getAssignedResource();
 	}
 
@@ -744,7 +745,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 */
 	TaskDeploymentDescriptor createDeploymentDescriptor(
 			ExecutionAttemptID executionId,
-			SimpleSlot targetSlot,
+			LogicalSlot targetSlot,
 			TaskStateSnapshot taskStateHandles,
 			int attemptNumber) throws ExecutionGraphException {
 		
@@ -779,8 +780,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		
 		
 		for (ExecutionEdge[] edges : inputEdges) {
-			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
-					.fromEdges(edges, targetSlot, lazyScheduling);
+			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
+				edges,
+				targetSlot.getTaskManagerLocation().getResourceID(),
+				lazyScheduling);
 
 			// If the produced partition has multiple consumers registered, we
 			// need to request the one matching our sub task index.
@@ -829,10 +832,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			serializedJobInformation,
 			serializedTaskInformation,
 			executionId,
-			targetSlot.getAllocatedSlot().getSlotAllocationId(),
+			targetSlot.getAllocationId(),
 			subTaskIndex,
 			attemptNumber,
-			targetSlot.getRoot().getSlotNumber(),
+			targetSlot.getPhysicalSlotNumber(),
 			taskStateHandles,
 			producedPartitions,
 			consumedPartitions);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
new file mode 100644
index 0000000..3ebe107
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+	/**
+	 * Return the TaskManager location of this slot
+	 *
+	 * @return TaskManager location of this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Return the TaskManager gateway to talk to the TaskManager.
+	 *
+	 * @return TaskManager gateway to talk to the TaskManager
+	 */
+	TaskManagerGateway getTaskManagerGateway();
+
+	/**
+	 * True if the slot is still alive.
+	 *
+	 * @return True if the slot is still alive, otherwise false
+	 */
+	boolean isAlive();
+
+	/**
+	 * True if the slot is canceled.
+	 *
+	 * @return True if the slot is canceled, otherwise false
+	 */
+	boolean isCanceled();
+
+	/**
+	 * True if the slot is released.
+	 *
+	 * @return True if the slot is released, otherwise false
+	 */
+	boolean isReleased();
+
+	/**
+	 * Sets the execution for this slot.
+	 *
+	 * @param execution to set for this slot
+	 * @return true if the slot could be set, otherwise false
+	 */
+	boolean setExecution(Execution execution);
+
+	/**
+	 * Releases this slot.
+	 */
+	void releaseSlot();
+
+	/**
+	 * Gets the slot number on the TaskManager.
+	 *
+	 * @return slot number
+	 */
+	int getPhysicalSlotNumber();
+
+	/**
+	 * Gets the allocation id of this slot.
+	 *
+	 * @return allocation id of this slot
+	 */
+	AllocationID getAllocationId();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 8c7ec01..9591028 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
@@ -37,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
  * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.
  * If not, then the parent attribute is null.
  */
-public class SimpleSlot extends Slot {
+public class SimpleSlot extends Slot implements LogicalSlot {
 
 	/** The updater used to atomically swap in the execution */
 	private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
@@ -163,7 +164,8 @@ public class SimpleSlot extends Slot {
 	 * @param executedVertex The vertex to assign to this slot.
 	 * @return True, if the vertex was assigned, false, otherwise.
 	 */
-	public boolean setExecutedVertex(Execution executedVertex) {
+	@Override
+	public boolean setExecution(Execution executedVertex) {
 		if (executedVertex == null) {
 			throw new NullPointerException();
 		}
@@ -231,6 +233,16 @@ public class SimpleSlot extends Slot {
 		}
 	}
 
+	@Override
+	public int getPhysicalSlotNumber() {
+		return getRootSlotNumber();
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return getAllocatedSlot().getSlotAllocationId();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 7e98e11..66af865 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -266,7 +266,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(
+	public CompletableFuture<LogicalSlot> allocateSlot(
 			SlotRequestID requestId,
 			ScheduledUnit task,
 			ResourceProfile resources,
@@ -303,7 +303,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
-	CompletableFuture<SimpleSlot> internalAllocateSlot(
+	CompletableFuture<LogicalSlot> internalAllocateSlot(
 			SlotRequestID requestId,
 			ScheduledUnit task,
 			ResourceProfile resources,
@@ -318,7 +318,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		// the request will be completed by a future
-		final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
 
 		// (2) need to request a slot
 		if (resourceManagerGateway == null) {
@@ -433,7 +433,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	private void stashRequestWaitingForResourceManager(
 			final SlotRequestID requestId,
 			final ResourceProfile resources,
-			final CompletableFuture<SimpleSlot> future) {
+			final CompletableFuture<LogicalSlot> future) {
 
 		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
 				"Adding as pending request {}",  requestId);
@@ -1087,15 +1087,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public CompletableFuture<SimpleSlot> allocateSlot(
+		public CompletableFuture<LogicalSlot> allocateSlot(
 				ScheduledUnit task,
 				boolean allowQueued,
 				Collection<TaskManagerLocation> preferredLocations) {
 
 			final SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<SimpleSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
+			CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
 			slotFuture.whenComplete(
-				(SimpleSlot slot, Throwable failure) -> {
+				(LogicalSlot slot, Throwable failure) -> {
 					if (failure != null) {
 						gateway.cancelSlotAllocation(requestId);
 					}
@@ -1113,13 +1113,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		private final SlotRequestID slotRequestId;
 
-		private final CompletableFuture<SimpleSlot> future;
+		private final CompletableFuture<LogicalSlot> future;
 
 		private final ResourceProfile resourceProfile;
 
 		PendingRequest(
 				SlotRequestID slotRequestId,
-				CompletableFuture<SimpleSlot> future,
+				CompletableFuture<LogicalSlot> future,
 				ResourceProfile resourceProfile) {
 			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 			this.future = Preconditions.checkNotNull(future);
@@ -1130,7 +1130,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			return slotRequestId;
 		}
 
-		public CompletableFuture<SimpleSlot> getFuture() {
+		public CompletableFuture<LogicalSlot> getFuture() {
 			return future;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index bf520f5..ad2a6a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -86,7 +86,7 @@ public interface SlotPoolGateway extends RpcGateway {
 	//  allocating and disposing slots
 	// ------------------------------------------------------------------------
 
-	CompletableFuture<SimpleSlot> allocateSlot(
+	CompletableFuture<LogicalSlot> allocateSlot(
 			SlotRequestID requestId,
 			ScheduledUnit task,
 			ResourceProfile resources,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index ef988b4..98427c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -45,7 +45,7 @@ public interface SlotProvider {
 	 * @param preferredLocations preferred locations for the slot allocation
 	 * @return The future of the allocation
 	 */
-	CompletableFuture<SimpleSlot> allocateSlot(
+	CompletableFuture<LogicalSlot> allocateSlot(
 		ScheduledUnit task,
 		boolean allowQueued,
 		Collection<TaskManagerLocation> preferredLocations);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 1995c12..2715146 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -133,7 +134,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(
+	public CompletableFuture<LogicalSlot> allocateSlot(
 			ScheduledUnit task,
 			boolean allowQueued,
 			Collection<TaskManagerLocation> preferredLocations) {
@@ -146,7 +147,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			}
 			else if (ret instanceof CompletableFuture) {
 				@SuppressWarnings("unchecked")
-				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+				CompletableFuture<LogicalSlot> typed = (CompletableFuture<LogicalSlot>) ret;
 				return typed;
 			}
 			else {
@@ -321,7 +322,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				else {
 					// no resource available now, so queue the request
 					if (queueIfNoResource) {
-						CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
+						CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
 						this.taskQueue.add(new QueuedTask(task, future));
 						return future;
 					}
@@ -837,10 +838,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 		
 		private final ScheduledUnit task;
 		
-		private final CompletableFuture<SimpleSlot> future;
+		private final CompletableFuture<LogicalSlot> future;
 		
 		
-		public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
+		public QueuedTask(ScheduledUnit task, CompletableFuture<LogicalSlot> future) {
 			this.task = task;
 			this.future = future;
 		}
@@ -849,7 +850,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return task;
 		}
 
-		public CompletableFuture<SimpleSlot> getFuture() {
+		public CompletableFuture<LogicalSlot> getFuture() {
 			return future;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1d4eb6a..687b6d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.instance.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -444,7 +444,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
 		}
 
-		final Slot slot = execution.getAssignedResource();
+		final LogicalSlot slot = execution.getAssignedResource();
 		final int taskId = execution.getVertex().getParallelSubtaskIndex();
 		final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
 		final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 27e8af3..fc2c06f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -60,7 +60,7 @@ public class InputChannelDeploymentDescriptorTest {
 
 		ResourceID consumerResourceId = ResourceID.generate();
 		ExecutionVertex consumer = mock(ExecutionVertex.class);
-		SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+		LogicalSlot consumerSlot = mockSlot(consumerResourceId);
 
 		// Local and remote channel are only allowed for certain execution
 		// states.
@@ -86,7 +86,7 @@ public class InputChannelDeploymentDescriptorTest {
 
 			InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
 				new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge},
-				consumerSlot,
+				consumerSlot.getTaskManagerLocation().getResourceID(),
 				allowLazyDeployment);
 
 			assertEquals(3, desc.length);
@@ -124,7 +124,7 @@ public class InputChannelDeploymentDescriptorTest {
 	public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception {
 		ResourceID consumerResourceId = ResourceID.generate();
 		ExecutionVertex consumer = mock(ExecutionVertex.class);
-		SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+		LogicalSlot consumerSlot = mockSlot(consumerResourceId);
 
 		// Unknown partition
 		ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
@@ -137,7 +137,7 @@ public class InputChannelDeploymentDescriptorTest {
 
 		InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
 			new ExecutionEdge[]{unknownEdge},
-			consumerSlot,
+			consumerSlot.getTaskManagerLocation().getResourceID(),
 			allowLazyDeployment);
 
 		assertEquals(1, desc.length);
@@ -152,7 +152,7 @@ public class InputChannelDeploymentDescriptorTest {
 
 			InputChannelDeploymentDescriptor.fromEdges(
 				new ExecutionEdge[]{unknownEdge},
-				consumerSlot,
+				consumerSlot.getTaskManagerLocation().getResourceID(),
 				allowLazyDeployment);
 
 			fail("Did not throw expected ExecutionGraphException");
@@ -162,10 +162,9 @@ public class InputChannelDeploymentDescriptorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static SimpleSlot mockSlot(ResourceID resourceId) {
-		SimpleSlot slot = mock(SimpleSlot.class);
+	private static LogicalSlot mockSlot(ResourceID resourceId) {
+		LogicalSlot slot = mock(LogicalSlot.class);
 		when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000));
-		when(slot.getTaskManagerID()).thenReturn(resourceId);
 
 		return slot;
 	}
@@ -178,7 +177,7 @@ public class InputChannelDeploymentDescriptorTest {
 		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
 
 		if (resourceId != null) {
-			SimpleSlot slot = mockSlot(resourceId);
+			LogicalSlot slot = mockSlot(resourceId);
 			when(exec.getAssignedResource()).thenReturn(slot);
 			when(vertex.getCurrentAssignedResource()).thenReturn(slot);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index de91d78..b489478 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -587,10 +588,10 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
 
-		final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>(2);
+		final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>(2);
 
 		for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
-			CompletableFuture<SimpleSlot>[] slotFutureArray = new CompletableFuture[parallelism];
+			CompletableFuture<LogicalSlot>[] slotFutureArray = new CompletableFuture[parallelism];
 
 			for (int i = 0; i < parallelism; i++) {
 				slotFutureArray[i] = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d3cec30..92c7c61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -18,38 +18,28 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.instance.TestingLogicalSlot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.mockito.Matchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -87,40 +77,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			Time timeout = Time.seconds(10L);
 			Scheduler scheduler = mock(Scheduler.class);
 
-			ResourceID taskManagerId = ResourceID.generate();
-
-			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-			when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
-			when(taskManagerLocation.getHostname()).thenReturn("localhost");
-
-			TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
-
-			Instance instance = mock(Instance.class);
-			when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
-			when(instance.getTaskManagerID()).thenReturn(taskManagerId);
-			when(instance.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-
-			Slot rootSlot = mock(Slot.class);
-
-			AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
-			when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
-
-			SimpleSlot simpleSlot = mock(SimpleSlot.class);
-			when(simpleSlot.isAlive()).thenReturn(true);
-			when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
-			when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
-			when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-			when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
-			when(simpleSlot.getRoot()).thenReturn(rootSlot);
-			when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
-
-			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
-			future.complete(simpleSlot);
-			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
-
-			when(rootSlot.getSlotNumber()).thenReturn(0);
-
-			when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+			CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
+			CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
+			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(slotFuture1, slotFuture2);
 
 			TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 
@@ -130,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				jobGraph.getJobID(),
 				jobGraph.getName(),
 				jobConfig,
-				new SerializedValue<ExecutionConfig>(null),
+				new SerializedValue<>(null),
 				timeout,
 				testingRestartStrategy,
 				scheduler);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 90136a6..71ca3a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -109,8 +110,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<>();
-		final CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> sourceFuture = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> targetFuture = new CompletableFuture<>();
 
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
 		slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
@@ -177,9 +178,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
+		final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
+		final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
 
 		//
 		//  Create the slots, futures, and the slot provider
@@ -283,9 +284,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
+		final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
+		final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
 			sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
@@ -358,7 +359,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final CompletableFuture<SimpleSlot>[] slotFutures = new CompletableFuture[parallelism];
+		final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
 			slots[i] = createSlot(taskManager, jobId, slotOwner);
@@ -392,7 +393,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		//  verify that no deployments have happened
 		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
 
-		for (CompletableFuture<SimpleSlot> future : slotFutures) {
+		for (CompletableFuture<LogicalSlot> future : slotFutures) {
 			if (future.isDone()) {
 				assertTrue(future.get().isCanceled());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index fa845cf..43a6432 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -62,7 +63,7 @@ public class ExecutionTest extends TestLogger {
 		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
 		jobVertex.setInvokableClass(NoOpInvokable.class);
 
-		final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
 		slotProvider.addSlot(jobVertexId, 0, slotFuture);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index cf08687..973c7d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -374,12 +374,8 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
 		when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
 
-		Slot root = mock(Slot.class);
-		when(root.getSlotNumber()).thenReturn(1);
-		SimpleSlot slot = mock(SimpleSlot.class);
-		when(slot.getRoot()).thenReturn(root);
-		when(slot.getAllocatedSlot()).thenReturn(allocatedSlot);
-		when(root.getAllocatedSlot()).thenReturn(allocatedSlot);
+		LogicalSlot slot = mock(LogicalSlot.class);
+		when(slot.getAllocationId()).thenReturn(new AllocationID());
 
 		for (ScheduleMode mode : ScheduleMode.values()) {
 			vertex.getExecutionGraph().setScheduleMode(mode);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 5d7fa1f..24affad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 class ProgrammedSlotProvider implements SlotProvider {
 
-	private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
+	private final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>();
 
 	private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>();
 
@@ -49,17 +49,17 @@ class ProgrammedSlotProvider implements SlotProvider {
 		this.parallelism = parallelism;
 	}
 
-	public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<SimpleSlot> future) {
+	public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<LogicalSlot> future) {
 		checkNotNull(vertex);
 		checkNotNull(future);
 		checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
 
-		CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
+		CompletableFuture<LogicalSlot>[] futures = slotFutures.get(vertex);
 		CompletableFuture<Boolean>[] requestedFutures = slotFutureRequested.get(vertex);
 
 		if (futures == null) {
 			@SuppressWarnings("unchecked")
-			CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
+			CompletableFuture<LogicalSlot>[] newArray = (CompletableFuture<LogicalSlot>[]) new CompletableFuture<?>[parallelism];
 			futures = newArray;
 			slotFutures.put(vertex, futures);
 
@@ -71,7 +71,7 @@ class ProgrammedSlotProvider implements SlotProvider {
 		requestedFutures[subtaskIndex] = new CompletableFuture<>();
 	}
 
-	public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
+	public void addSlots(JobVertexID vertex, CompletableFuture<LogicalSlot>[] futures) {
 		checkNotNull(vertex);
 		checkNotNull(futures);
 		checkArgument(futures.length == parallelism);
@@ -92,16 +92,16 @@ class ProgrammedSlotProvider implements SlotProvider {
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(
+	public CompletableFuture<LogicalSlot> allocateSlot(
 			ScheduledUnit task,
 			boolean allowQueued,
 			Collection<TaskManagerLocation> preferredLocations) {
 		JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
 		int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
 
-		CompletableFuture<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+		CompletableFuture<LogicalSlot>[] forTask = slotFutures.get(vertexId);
 		if (forTask != null) {
-			CompletableFuture<SimpleSlot> future = forTask[subtask];
+			CompletableFuture<LogicalSlot> future = forTask[subtask];
 			if (future != null) {
 				slotFutureRequested.get(vertexId)[subtask].complete(true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index a2323bf..3d28983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -71,7 +72,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(
+	public CompletableFuture<LogicalSlot> allocateSlot(
 			ScheduledUnit task,
 			boolean allowQueued,
 			Collection<TaskManagerLocation> preferredLocations) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 0edef5e..4a6bf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -38,7 +39,7 @@ import static org.junit.Assert.*;
 /**
  * Tests for the allocation, properties, and release of shared slots.
  */
-public class SharedSlotsTest {
+public class SharedSlotsTest extends TestLogger {
 
 	private static final Iterable<TaskManagerLocation> NO_LOCATION = Collections.emptySet();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index fd1c17b..db71210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -29,11 +29,13 @@ import org.apache.flink.api.common.JobID;
 
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import org.mockito.Matchers;
 
-public class SimpleSlotTest {
+public class SimpleSlotTest extends TestLogger {
 
 	@Test
 	public void testStateTransitions() {
@@ -81,11 +83,11 @@ public class SimpleSlotTest {
 			{
 				SimpleSlot slot = getSlot();
 
-				assertTrue(slot.setExecutedVertex(ev));
+				assertTrue(slot.setExecution(ev));
 				assertEquals(ev, slot.getExecutedVertex());
 
 				// try to add another one
-				assertFalse(slot.setExecutedVertex(ev_2));
+				assertFalse(slot.setExecution(ev_2));
 				assertEquals(ev, slot.getExecutedVertex());
 			}
 
@@ -94,7 +96,7 @@ public class SimpleSlotTest {
 				SimpleSlot slot = getSlot();
 				assertTrue(slot.markCancelled());
 
-				assertFalse(slot.setExecutedVertex(ev));
+				assertFalse(slot.setExecution(ev));
 				assertNull(slot.getExecutedVertex());
 			}
 
@@ -104,7 +106,7 @@ public class SimpleSlotTest {
 				assertTrue(slot.markCancelled());
 				assertTrue(slot.markReleased());
 
-				assertFalse(slot.setExecutedVertex(ev));
+				assertFalse(slot.setExecution(ev));
 				assertNull(slot.getExecutedVertex());
 			}
 			
@@ -113,7 +115,7 @@ public class SimpleSlotTest {
 				SimpleSlot slot = getSlot();
 				slot.releaseSlot();
 
-				assertFalse(slot.setExecutedVertex(ev));
+				assertFalse(slot.setExecution(ev));
 				assertNull(slot.getExecutedVertex());
 			}
 		}
@@ -129,7 +131,7 @@ public class SimpleSlotTest {
 			Execution ev = mock(Execution.class);
 
 			SimpleSlot slot = getSlot();
-			assertTrue(slot.setExecutedVertex(ev));
+			assertTrue(slot.setExecution(ev));
 			assertEquals(ev, slot.getExecutedVertex());
 
 			slot.releaseSlot();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index ca5d826..8875e00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -107,7 +107,7 @@ public class SlotPoolRpcTest extends TestLogger {
 		try {
 			pool.start(JobMasterId.generate(), "foobar");
 
-			CompletableFuture<SimpleSlot> future = pool.allocateSlot(
+			CompletableFuture<LogicalSlot> future = pool.allocateSlot(
 				new SlotPoolGateway.SlotRequestID(),
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
@@ -142,7 +142,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
-			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
@@ -186,7 +186,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.connectToResourceManager(resourceManagerGateway);
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
-			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
@@ -237,7 +237,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.connectToResourceManager(resourceManagerGateway);
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
-			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
@@ -300,7 +300,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
 
 			// test the pending request is clear when timed out
-			CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(
+			CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
 				mockScheduledUnit,
 				true,
 				Collections.emptyList());

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index c307373..271bc2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -96,7 +96,7 @@ public class SlotPoolTest extends TestLogger {
 			slotPoolGateway.registerTaskManager(resourceID);
 
 			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
-			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -107,13 +107,11 @@ public class SlotPoolTest extends TestLogger {
 			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-			SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
 			assertTrue(future.isDone());
 			assertTrue(slot.isAlive());
-			assertEquals(resourceID, slot.getTaskManagerID());
-			assertEquals(jobId, slot.getJobID());
-			assertEquals(slotPool.getSlotOwner(), slot.getOwner());
-			assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
+			assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
+			assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
 		} finally {
 			slotPool.shutDown();
 		}
@@ -129,8 +127,8 @@ public class SlotPoolTest extends TestLogger {
 			ResourceID resourceID = new ResourceID("resource");
 			slotPool.registerTaskManager(resourceID);
 
-			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			assertFalse(future1.isDone());
 			assertFalse(future2.isDone());
@@ -144,7 +142,7 @@ public class SlotPoolTest extends TestLogger {
 			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
 			assertFalse(future2.isDone());
 
@@ -152,15 +150,15 @@ public class SlotPoolTest extends TestLogger {
 			slot1.releaseSlot();
 
 			// second allocation fulfilled by previous slot returning
-			SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
 			assertTrue(future2.isDone());
 
 			assertNotEquals(slot1, slot2);
 			assertTrue(slot1.isReleased());
 			assertTrue(slot2.isAlive());
-			assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-			assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-			assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
+			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
+			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
+			assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
 		} finally {
 			slotPool.shutDown();
 		}
@@ -176,7 +174,7 @@ public class SlotPoolTest extends TestLogger {
 			ResourceID resourceID = new ResourceID("resource");
 			slotPoolGateway.registerTaskManager(resourceID);
 
-			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future1.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -187,23 +185,23 @@ public class SlotPoolTest extends TestLogger {
 			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
 
 			// return this slot to pool
 			slot1.releaseSlot();
 
-			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			// second allocation fulfilled by previous slot returning
-			SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
 			assertTrue(future2.isDone());
 
 			assertNotEquals(slot1, slot2);
 			assertTrue(slot1.isReleased());
 			assertTrue(slot2.isAlive());
-			assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
-			assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+			assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
+			assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
 		} finally {
 			slotPool.shutDown();
 		}
@@ -219,7 +217,7 @@ public class SlotPoolTest extends TestLogger {
 			ResourceID resourceID = new ResourceID("resource");
 			slotPoolGateway.registerTaskManager(resourceID);
 
-			CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -240,7 +238,7 @@ public class SlotPoolTest extends TestLogger {
 
 			// accepted slot
 			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
-			SimpleSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+			LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(slot.isAlive());
 
 			// duplicated offer with using slot
@@ -275,19 +273,19 @@ public class SlotPoolTest extends TestLogger {
 			ResourceID resourceID = new ResourceID("resource");
 			slotPoolGateway.registerTaskManager(resourceID);
 
-			CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 			assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
 
-			SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+			LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
 			assertTrue(future1.isDone());
 			assertFalse(future2.isDone());
 
@@ -332,7 +330,7 @@ public class SlotPoolTest extends TestLogger {
 
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
-			CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
+			CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
 				new SlotPoolGateway.SlotRequestID(),
 				scheduledUnit,
 				ResourceProfile.UNKNOWN,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
new file mode 100644
index 0000000..36a47b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Simple testing logical slot for testing purposes.
+ */
+public class TestingLogicalSlot implements LogicalSlot {
+
+	private final TaskManagerLocation taskManagerLocation;
+
+	private final TaskManagerGateway taskManagerGateway;
+
+	private final CompletableFuture<?> releaseFuture;
+
+	private final AtomicReference<Execution> executionReference;
+
+	private final int slotNumber;
+
+	private final AllocationID allocationId;
+
+	public TestingLogicalSlot() {
+		this(
+			new LocalTaskManagerLocation(),
+			new SimpleAckingTaskManagerGateway(),
+			0,
+			new AllocationID());
+	}
+
+	public TestingLogicalSlot(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			int slotNumber,
+			AllocationID allocationId) {
+		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+		this.releaseFuture = new CompletableFuture<>();
+		this.executionReference = new AtomicReference<>();
+		this.slotNumber = slotNumber;
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+	}
+
+	@Override
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	@Override
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+
+	@Override
+	public boolean isAlive() {
+		return !releaseFuture.isDone();
+	}
+
+	@Override
+	public boolean isCanceled() {
+		return releaseFuture.isDone();
+	}
+
+	@Override
+	public boolean isReleased() {
+		return releaseFuture.isDone();
+	}
+
+	@Override
+	public boolean setExecution(Execution execution) {
+		return executionReference.compareAndSet(null, execution);
+	}
+
+	@Override
+	public void releaseSlot() {
+		releaseFuture.complete(null);
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return slotNumber;
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 0346e48..f57726c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -32,11 +32,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class PartialConsumePipelinedResultTest {
+public class PartialConsumePipelinedResultTest extends TestLogger {
 
 	// Test configuration
 	private final static int NUMBER_OF_TMS = 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 9c781ec1..d861455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -39,7 +40,7 @@ import java.util.List;
 
 import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
 
-public class ScheduleOrUpdateConsumersTest {
+public class ScheduleOrUpdateConsumersTest extends TestLogger {
 
 	private final static int NUMBER_OF_TMS = 2;
 	private final static int NUMBER_OF_SLOTS_PER_TM = 2;