You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:38 UTC

[4/6] flink git commit: [FLINK-8088] Associate logical slots with the slot request id

[FLINK-8088] Associate logical slots with the slot request id

Before logical slots like the SimpleSlot and SharedSlot where associated to the
actually allocated slot via the AllocationID. This, however, was sub-optimal because
allocated slots can be re-used to fulfill also other slot requests (logical slots).
Therefore, we should bind the logical slots to the right id with the right lifecycle
which is the slot request id.

This closes #5089.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc1c375a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc1c375a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc1c375a

Branch: refs/heads/master
Commit: bc1c375aa061f27a2fbc5a7688b06da70fed5d20
Parents: a569f38
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 24 18:06:10 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:25 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   |  66 +++++++++--
 .../apache/flink/runtime/instance/Instance.java |   9 +-
 .../flink/runtime/instance/LogicalSlot.java     |   8 ++
 .../flink/runtime/instance/SharedSlot.java      |   5 +
 .../flink/runtime/instance/SimpleSlot.java      |   6 +
 .../org/apache/flink/runtime/instance/Slot.java |   4 +-
 .../apache/flink/runtime/instance/SlotPool.java | 112 +++++++++++--------
 .../flink/runtime/instance/SlotPoolGateway.java |  14 +--
 .../flink/runtime/instance/SlotRequestID.java   |  34 ++++++
 .../jobmanager/slots/SimpleSlotContext.java     |  10 ++
 .../runtime/jobmanager/slots/SlotContext.java   |  13 ++-
 .../runtime/jobmanager/slots/SlotOwner.java     |   6 +-
 .../ExecutionGraphSchedulingTest.java           |   7 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  10 +-
 .../runtime/executiongraph/ExecutionTest.java   |  11 +-
 .../ExecutionVertexDeploymentTest.java          |   6 +-
 .../ExecutionVertexLocalityTest.java            |   9 +-
 .../utils/SimpleSlotProvider.java               |  17 ++-
 .../runtime/instance/AllocatedSlotsTest.java    |   6 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |  10 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  23 ++--
 .../runtime/instance/TestingLogicalSlot.java    |  16 ++-
 .../jobmanager/slots/DummySlotOwner.java        |   4 +-
 .../jobmanager/slots/TestingSlotOwner.java      |  12 +-
 24 files changed, 289 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 7036044..a3f98f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -44,10 +45,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
  * JobManager. All slots had a default unknown resource profile. 
  */
-public class AllocatedSlot implements SlotContext {
+public class AllocatedSlot {
 
 	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
-	private final AllocationID slotAllocationId;
+	private final AllocationID allocationId;
 
 	/** The location information of the TaskManager to which this slot belongs */
 	private final TaskManagerLocation taskManagerLocation;
@@ -68,13 +69,13 @@ public class AllocatedSlot implements SlotContext {
 	// ------------------------------------------------------------------------
 
 	public AllocatedSlot(
-			AllocationID slotAllocationId,
+			AllocationID allocationId,
 			TaskManagerLocation location,
 			int physicalSlotNumber,
 			ResourceProfile resourceProfile,
 			TaskManagerGateway taskManagerGateway,
 			SlotOwner slotOwner) {
-		this.slotAllocationId = checkNotNull(slotAllocationId);
+		this.allocationId = checkNotNull(allocationId);
 		this.taskManagerLocation = checkNotNull(location);
 		this.physicalSlotNumber = physicalSlotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
@@ -92,7 +93,7 @@ public class AllocatedSlot implements SlotContext {
 	 * @return The ID under which the slot is allocated
 	 */
 	public AllocationID getAllocationId() {
-		return slotAllocationId;
+		return allocationId;
 	}
 
 	/**
@@ -182,12 +183,16 @@ public class AllocatedSlot implements SlotContext {
 	/**
 	 * Allocates a logical {@link SimpleSlot}.
 	 *
+	 * @param slotRequestId identifying the corresponding slot request
+	 * @param locality specifying the locality of the allocated slot
 	 * @return an allocated logical simple slot
 	 * @throws SlotException if we could not allocate a simple slot
 	 */
-	public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
+	public SimpleSlot allocateSimpleSlot(SlotRequestID slotRequestId, Locality locality) throws SlotException {
+		final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
+			slotRequestId);
 
-		final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
+		final SimpleSlot simpleSlot = new SimpleSlot(allocatedSlotContext, slotOwner, physicalSlotNumber);
 
 		if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
 			simpleSlot.setLocality(locality);
@@ -200,12 +205,16 @@ public class AllocatedSlot implements SlotContext {
 	/**
 	 * Allocates a logical {@link SharedSlot}.
 	 *
+	 * @param slotRequestId identifying the corresponding slot request
 	 * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
 	 * @return an allocated logical shared slot
 	 * @throws SlotException if we could not allocate a shared slot
 	 */
-	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
-		final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
+	public SharedSlot allocateSharedSlot(SlotRequestID slotRequestId, SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
+
+		final AllocatedSlotContext allocatedSlotContext = new AllocatedSlotContext(
+			slotRequestId);
+		final SharedSlot sharedSlot = new SharedSlot(allocatedSlotContext, slotOwner, slotSharingGroupAssignment);
 
 		if (logicalSlotReference.compareAndSet(null, sharedSlot)) {
 
@@ -236,6 +245,43 @@ public class AllocatedSlot implements SlotContext {
 
 	@Override
 	public String toString() {
-		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+		return "AllocatedSlot " + allocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+	}
+
+	/**
+	 * Slot context for {@link AllocatedSlot}.
+	 */
+	private final class AllocatedSlotContext implements SlotContext {
+
+		private final SlotRequestID slotRequestId;
+
+		private AllocatedSlotContext(SlotRequestID slotRequestId) {
+			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+		}
+
+		@Override
+		public SlotRequestID getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		@Override
+		public AllocationID getAllocationId() {
+			return allocationId;
+		}
+
+		@Override
+		public TaskManagerLocation getTaskManagerLocation() {
+			return taskManagerLocation;
+		}
+
+		@Override
+		public int getPhysicalSlotNumber() {
+			return physicalSlotNumber;
+		}
+
+		@Override
+		public TaskManagerGateway getTaskManagerGateway() {
+			return taskManagerGateway;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 54c8971..44ee29d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -276,12 +276,15 @@ public class Instance implements SlotOwner {
 	 * <p>The method will transition the slot to the "released" state. If the slot is already in state
 	 * "released", this method will do nothing.</p>
 	 * 
-	 * @param slot The slot to return.
+	 * @param logicalSlot The slot to return.
 	 * @return Future which is completed with true, if the slot was returned, false if not.
 	 */
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-		checkNotNull(slot);
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		checkNotNull(logicalSlot);
+		checkArgument(logicalSlot instanceof Slot);
+
+		final Slot slot = ((Slot) logicalSlot);
 		checkArgument(!slot.isAlive(), "slot is still alive");
 		checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
index e663265..b3104ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -93,6 +93,14 @@ public interface LogicalSlot {
 	AllocationID getAllocationId();
 
 	/**
+	 * Gets the slot request id uniquely identifying the request with which this
+	 * slot has been allocated.
+	 *
+	 * @return Unique id identifying the slot request with which this slot was allocated
+	 */
+	SlotRequestID getSlotRequestId();
+
+	/**
 	 * Payload for a logical slot.
 	 */
 	interface Payload {

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 8637159..8c9fe1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -213,6 +213,11 @@ public class SharedSlot extends Slot implements LogicalSlot {
 		return getSlotContext().getAllocationId();
 	}
 
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return getSlotContext().getSlotRequestId();
+	}
+
 	/**
 	 * Gets the set of all slots allocated as sub-slots of this shared slot.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index d397c08..e98832f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -97,6 +97,7 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 			parent != null ?
 				parent.getSlotContext() :
 				new SimpleSlotContext(
+					NO_SLOT_REQUEST_ID,
 					NO_ALLOCATION_ID,
 					location,
 					slotNumber,
@@ -274,6 +275,11 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 		return getSlotContext().getAllocationId();
 	}
 
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return getSlotContext().getSlotRequestId();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 6262c9a..e82f075 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -61,7 +61,8 @@ public abstract class Slot {
 	private static final int RELEASED = 2;
 
 	// temporary placeholder for Slots that are not constructed from an AllocatedSlot (prior to FLIP-6)
-	protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0, 0);
+	protected static final AllocationID NO_ALLOCATION_ID = new AllocationID(0L, 0L);
+	protected static final SlotRequestID NO_SLOT_REQUEST_ID = new SlotRequestID(0L, 0L);
 
 	// ------------------------------------------------------------------------
 
@@ -111,6 +112,7 @@ public abstract class Slot {
 
 		// create a simple slot context
 		this.slotContext = new SimpleSlotContext(
+			NO_SLOT_REQUEST_ID,
 			NO_ALLOCATION_ID,
 			location,
 			slotNumber,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 2ccea75..a72f57b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -278,25 +277,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public void returnAllocatedSlot(SlotContext allocatedSlot) {
-		internalReturnAllocatedSlot(allocatedSlot.getAllocationId());
+	public void returnAllocatedSlot(SlotRequestID slotRequestId) {
+		final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
+
+		if (allocatedSlot != null) {
+			internalReturnAllocatedSlot(allocatedSlot);
+		} else {
+			log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
+		}
 	}
 
 	@Override
-	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) {
-		final PendingRequest pendingRequest = removePendingRequest(requestId);
+	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
+		final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
 
 		if (pendingRequest != null) {
-			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled."));
+			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled."));
 		} else {
-			final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId);
+			final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId);
 
 			if (allocatedSlot != null) {
-				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId);
+				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId);
 				// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
 				allocatedSlot.triggerLogicalSlotRelease();
 			} else {
-				LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
+				LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId);
 			}
 		}
 
@@ -316,7 +321,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 			final SimpleSlot simpleSlot;
 			try {
-				simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality());
+				simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality());
 			} catch (SlotException e) {
 				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 
@@ -335,9 +340,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		return allocatedSlotFuture.thenApply(
 			(AllocatedSlot allocatedSlot) -> {
 				try {
-					return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN);
+					return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
 				} catch (SlotException e) {
-					returnAllocatedSlot(allocatedSlot);
+					internalReturnAllocatedSlot(allocatedSlot);
 
 					throw new CompletionException("Could not allocate a logical simple slot.", e);
 				}
@@ -495,31 +500,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
 	 * slot can be reused by other pending requests if the resource profile matches.n
 	 *
-	 * @param allocationId identifying the slot which is returned
+	 * @param allocatedSlot which shall be returned
 	 */
-	private void internalReturnAllocatedSlot(AllocationID allocationId) {
-		final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId);
-
-		if (allocatedSlot != null) {
-			if (allocatedSlot.releaseLogicalSlot()) {
+	private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
+		if (allocatedSlot.releaseLogicalSlot()) {
 
-				final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+			final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
-				if (pendingRequest != null) {
-					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-						pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
+			if (pendingRequest != null) {
+				LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+					pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-					allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-					pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-				} else {
-					LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
-					availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-				}
+				allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+				pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 			} else {
-				LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
+				LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 			}
 		} else {
-			LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId);
+			LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
 		}
 	}
 
@@ -820,25 +819,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		/**
-		 * Remove an allocation with slot.
+		 * Removes the allocated slot specified by the provided slot allocation id.
 		 *
-		 * @param slotId The ID of the slot to be removed
+		 * @param allocationID identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
 		 */
-		AllocatedSlot remove(final AllocationID slotId) {
-			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId);
+		@Nullable
+		AllocatedSlot remove(final AllocationID allocationID) {
+			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);
+
 			if (allocatedSlot != null) {
-				final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
-				Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+				removeAllocatedSlot(allocatedSlot);
+			}
 
-				slotsForTM.remove(allocatedSlot);
+			return allocatedSlot;
+		}
 
-				if (slotsForTM.isEmpty()) {
-					allocatedSlotsByTaskManager.remove(taskManagerId);
-				}
-				return allocatedSlot;
+		/**
+		 * Removes the allocated slot specified by the provided slot request id.
+		 *
+		 * @param slotRequestId identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
+		 */
+		@Nullable
+		AllocatedSlot remove(final SlotRequestID slotRequestId) {
+			final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);
+
+			if (allocatedSlot != null) {
+				removeAllocatedSlot(allocatedSlot);
 			}
-			else {
-				return null;
+
+			return allocatedSlot;
+		}
+
+		private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
+			Preconditions.checkNotNull(allocatedSlot);
+			final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
+			Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+
+			slotsForTM.remove(allocatedSlot);
+
+			if (slotsForTM.isEmpty()) {
+				allocatedSlotsByTaskManager.remove(taskManagerId);
 			}
 		}
 
@@ -1106,8 +1128,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			gateway.returnAllocatedSlot(slot.getSlotContext());
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
+			gateway.returnAllocatedSlot(slot.getSlotRequestId());
 			return CompletableFuture.completedFuture(true);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 71de054..103bc61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.AbstractID;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -99,20 +97,14 @@ public interface SlotPoolGateway extends RpcGateway {
 			Iterable<TaskManagerLocation> locationPreferences,
 			@RpcTimeout Time timeout);
 
-	void returnAllocatedSlot(SlotContext slotInformation);
+	void returnAllocatedSlot(SlotRequestID slotRequestId);
 
 	/**
 	 * Cancel a slot allocation request.
 	 *
-	 * @param requestId identifying the slot allocation request
+	 * @param slotRequestId identifying the slot allocation request
 	 * @return Future acknowledge if the slot allocation has been cancelled
 	 */
-	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId);
+	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId);
 
-	/**
-	 * Request ID identifying different slot requests.
-	 */
-	final class SlotRequestID extends AbstractID {
-		private static final long serialVersionUID = -6072105912250154283L;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
new file mode 100644
index 0000000..8e19944
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Request ID identifying different slot requests.
+ */
+public final class SlotRequestID extends AbstractID {
+    private static final long serialVersionUID = -6072105912250154283L;
+
+    public SlotRequestID(long lowerPart, long upperPart) {
+        super(lowerPart, upperPart);
+    }
+
+    public SlotRequestID() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
index 5dccc1f..a5b75d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 
@@ -27,6 +28,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class SimpleSlotContext implements SlotContext {
 
+	private final SlotRequestID slotRequestId;
+
 	private final AllocationID allocationId;
 
 	private final TaskManagerLocation taskManagerLocation;
@@ -36,10 +39,12 @@ public class SimpleSlotContext implements SlotContext {
 	private final TaskManagerGateway taskManagerGateway;
 
 	public SimpleSlotContext(
+			SlotRequestID slotRequestId,
 			AllocationID allocationId,
 			TaskManagerLocation taskManagerLocation,
 			int physicalSlotNumber,
 			TaskManagerGateway taskManagerGateway) {
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 		this.allocationId = Preconditions.checkNotNull(allocationId);
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.physicalSlotNumber = physicalSlotNumber;
@@ -47,6 +52,11 @@ public class SimpleSlotContext implements SlotContext {
 	}
 
 	@Override
+	public SlotRequestID getSlotRequestId() {
+		return slotRequestId;
+	}
+
+	@Override
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
index d8a1aa4..1e0317a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 /**
@@ -30,9 +31,17 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 public interface SlotContext {
 
 	/**
-	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 * Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot.
 	 *
-	 * @return The ID under which the slot is allocated
+	 * @return The id under which the slot has been requested
+	 */
+	SlotRequestID getSlotRequestId();
+
+	/**
+	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
+	 * physical slot.
+	 *
+	 * @return The id under whic teh slot has been allocated on the TaskManager
 	 */
 	AllocationID getAllocationId();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
index cb4488d..bc1ced4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -30,8 +30,8 @@ public interface SlotOwner {
 	/**
 	 * Return the given slot to the slot owner.
 	 *
-	 * @param slot to return
+	 * @param logicalSlot to return
 	 * @return Future which is completed with true if the slot could be returned, otherwise with false
 	 */
-	CompletableFuture<Boolean> returnAllocatedSlot(Slot slot);
+	CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 586f51b..18e6cf1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -284,7 +284,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
+			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -365,7 +365,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(2);
 		final TestingSlotOwner slotOwner = new TestingSlotOwner();
 		slotOwner.setReturnAllocatedSlotConsumer(
-			(Slot slot) -> returnedSlots.offer(slot.getSlotContext().getAllocationId()));
+			(LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId()));
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
@@ -448,6 +448,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 
 		SimpleSlotContext slot = new SimpleSlotContext(
+			new SlotRequestID(),
 			new AllocationID(),
 			location,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 06ffaa0..c97329f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -244,10 +245,11 @@ public class ExecutionGraphTestUtils {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
 
 		final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
-				new AllocationID(),
-				location,
-				0,
-				gateway);
+			new SlotRequestID(),
+			new AllocationID(),
+			location,
+			0,
+			gateway);
 
 		return new SimpleSlot(
 			allocatedSlot,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 71d6f51..e3fd0df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
@@ -306,7 +305,7 @@ public class ExecutionTest extends TestLogger {
 
 		Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
 
-		CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
+		CompletableFuture<LogicalSlot> returnedSlotFuture = slotOwner.getReturnedSlotFuture();
 		CompletableFuture<?> terminationFuture = executionVertex.cancel();
 
 		// run canceling in a separate thread to allow an interleaving between termination
@@ -334,15 +333,15 @@ public class ExecutionTest extends TestLogger {
 	 */
 	private static final class SingleSlotTestingSlotOwner implements SlotOwner {
 
-		final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
+		final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>();
 
-		public CompletableFuture<Slot> getReturnedSlotFuture() {
+		public CompletableFuture<LogicalSlot> getReturnedSlotFuture() {
 			return returnedSlot;
 		}
 
 		@Override
-		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			return CompletableFuture.completedFuture(returnedSlot.complete(slot));
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+			return CompletableFuture.completedFuture(returnedSlot.complete(logicalSlot));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 7f97d12..63cebf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
@@ -371,8 +371,8 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
 		result.getPartitions()[0].addConsumerGroup();
 		result.getPartitions()[0].addConsumer(mockEdge, 0);
 
-		AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
-		when(allocatedSlot.getAllocationId()).thenReturn(new AllocationID());
+		SlotContext slotContext = mock(SlotContext.class);
+		when(slotContext.getAllocationId()).thenReturn(new AllocationID());
 
 		LogicalSlot slot = mock(LogicalSlot.class);
 		when(slot.getAllocationId()).thenReturn(new AllocationID());

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 98f7259..bffbb6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -31,13 +31,14 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -234,7 +235,11 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
 		SlotContext slot = new SimpleSlotContext(
-				new AllocationID(), location, 0, mock(TaskManagerGateway.class));
+			new SlotRequestID(),
+			new AllocationID(),
+			location,
+			0,
+			mock(TaskManagerGateway.class));
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 9a19d24..82953d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotRequestID;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.SlotContext;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
@@ -61,10 +63,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
 		for (int i = 0; i < numSlots; i++) {
 			SimpleSlotContext as = new SimpleSlotContext(
-					new AllocationID(),
-					new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
-					0,
-					taskManagerGateway);
+				new SlotRequestID(),
+				new AllocationID(),
+				new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
+				0,
+				taskManagerGateway);
 			slots.add(as);
 		}
 	}
@@ -94,7 +97,11 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		Preconditions.checkArgument(logicalSlot instanceof Slot);
+
+		final Slot slot = ((Slot) logicalSlot);
+
 		synchronized (slots) {
 			slots.add(slot.getSlotContext());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index bc396c1..223d43c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -41,7 +41,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
 
 		final AllocationID allocation1 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID = new SlotRequestID();
 		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 		final ResourceID resource1 = taskManagerLocation.getResourceID();
 		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
@@ -56,7 +56,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		assertEquals(1, allocatedSlots.size());
 
 		final AllocationID allocation2 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID2 = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID2 = new SlotRequestID();
 		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
 
 		allocatedSlots.add(slotRequestID2, slot2);
@@ -71,7 +71,7 @@ public class AllocatedSlotsTest extends TestLogger {
 		assertEquals(2, allocatedSlots.size());
 
 		final AllocationID allocation3 = new AllocationID();
-		final SlotPoolGateway.SlotRequestID slotRequestID3 = new SlotPoolGateway.SlotRequestID();
+		final SlotRequestID slotRequestID3 = new SlotRequestID();
 		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
 		final ResourceID resource2 = taskManagerLocation2.getResourceID();
 		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 5d82f47..60e1d34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -111,7 +111,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.start(JobMasterId.generate(), "foobar");
 
 			CompletableFuture<LogicalSlot> future = pool.allocateSlot(
-				new SlotPoolGateway.SlotRequestID(),
+				new SlotRequestID(),
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
 				DEFAULT_TESTING_PROFILE,
 				Collections.emptyList(),
@@ -144,7 +144,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			pool.start(JobMasterId.generate(), "foobar");
 			SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -188,7 +188,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
 			pool.connectToResourceManager(resourceManagerGateway);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -239,7 +239,7 @@ public class SlotPoolRpcTest extends TestLogger {
 
 			pool.connectToResourceManager(resourceManagerGateway);
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
 				requestId,
 				new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
@@ -295,7 +295,7 @@ public class SlotPoolRpcTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		final CompletableFuture<SlotPoolGateway.SlotRequestID> cancelFuture = new CompletableFuture<>();
+		final CompletableFuture<SlotRequestID> cancelFuture = new CompletableFuture<>();
 
 		pool.setCancelSlotAllocationConsumer(
 			slotRequestID -> cancelFuture.complete(slotRequestID));

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 9d90a12..ec20f6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -103,7 +102,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
+			SlotRequestID requestId = new SlotRequestID();
 			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
@@ -137,8 +136,8 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			assertFalse(future1.isDone());
 			assertFalse(future2.isDone());
@@ -187,7 +186,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future1.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -208,7 +207,7 @@ public class SlotPoolTest extends TestLogger {
 			// return this slot to pool
 			slot1.releaseSlot();
 
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			// second allocation fulfilled by previous slot returning
 			LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -233,7 +232,7 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 			assertFalse(future.isDone());
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -284,8 +283,8 @@ public class SlotPoolTest extends TestLogger {
 
 		final SlotPool slotPool = new SlotPool(rpcService, jobId) {
 			@Override
-			public void returnAllocatedSlot(SlotContext allocatedSlot) {
-				super.returnAllocatedSlot(allocatedSlot);
+			public void returnAllocatedSlot(SlotRequestID slotRequestId) {
+				super.returnAllocatedSlot(slotRequestId);
 
 				slotReturnFuture.complete(true);
 			}
@@ -295,14 +294,14 @@ public class SlotPoolTest extends TestLogger {
 			SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
 			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
 
-			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 			final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+			CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
 
 			final SlotOffer slotOffer = new SlotOffer(
 				slotRequest.getAllocationId(),
@@ -357,7 +356,7 @@ public class SlotPoolTest extends TestLogger {
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
 			CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
-				new SlotPoolGateway.SlotRequestID(),
+				new SlotRequestID(),
 				scheduledUnit,
 				ResourceProfile.UNKNOWN,
 				Collections.emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
index 925933d..2066017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -44,27 +44,32 @@ public class TestingLogicalSlot implements LogicalSlot {
 	private final int slotNumber;
 
 	private final CompletableFuture<?> releaseFuture = new CompletableFuture<>();
-
+	
 	private final AllocationID allocationId;
 
+	private final SlotRequestID slotRequestId;
+
 	public TestingLogicalSlot() {
 		this(
 			new LocalTaskManagerLocation(),
 			new SimpleAckingTaskManagerGateway(),
 			0,
-			new AllocationID());
+			new AllocationID(),
+			new SlotRequestID());
 	}
 
 	public TestingLogicalSlot(
 			TaskManagerLocation taskManagerLocation,
 			TaskManagerGateway taskManagerGateway,
 			int slotNumber,
-			AllocationID allocationId) {
+			AllocationID allocationId,
+			SlotRequestID slotRequestId) {
 		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
 		this.payloadReference = new AtomicReference<>();
 		this.slotNumber = slotNumber;
 		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
 	}
 
 	@Override
@@ -109,4 +114,9 @@ public class TestingLogicalSlot implements LogicalSlot {
 	public AllocationID getAllocationId() {
 		return allocationId;
 	}
+
+	@Override
+	public SlotRequestID getSlotRequestId() {
+		return slotRequestId;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
index 6894542..6d17ad0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/DummySlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture;
  */
 public class DummySlotOwner implements SlotOwner {
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
 		return CompletableFuture.completedFuture(false);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc1c375a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
index 7c124ef..e7f9485 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/slots/TestingSlotOwner.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -28,18 +28,18 @@ import java.util.function.Consumer;
  */
 public class TestingSlotOwner implements SlotOwner {
 
-	private volatile Consumer<Slot> returnAllocatedSlotConsumer;
+	private volatile Consumer<LogicalSlot> returnAllocatedSlotConsumer;
 
-	public void setReturnAllocatedSlotConsumer(Consumer<Slot> returnAllocatedSlotConsumer) {
+	public void setReturnAllocatedSlotConsumer(Consumer<LogicalSlot> returnAllocatedSlotConsumer) {
 		this.returnAllocatedSlotConsumer = returnAllocatedSlotConsumer;
 	}
 
 	@Override
-	public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-		final Consumer<Slot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
+	public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot) {
+		final Consumer<LogicalSlot> currentReturnAllocatedSlotConsumer = this.returnAllocatedSlotConsumer;
 
 		if (currentReturnAllocatedSlotConsumer != null) {
-			currentReturnAllocatedSlotConsumer.accept(slot);
+			currentReturnAllocatedSlotConsumer.accept(logicalSlot);
 		}
 
 		return CompletableFuture.completedFuture(true);