You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:37 UTC

[3/6] flink git commit: [FLINK-8087] Decouple Slot from AllocatedSlot

[FLINK-8087] Decouple Slot from AllocatedSlot

This commit introduces the SlotContext which is an abstraction for the SimpleSlot
to obtain the relevant slot information to do the communication with the
TaskManager without relying on the AllocatedSlot which is now only used by the
SlotPool.

This closes #5088.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a569f38f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a569f38f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a569f38f

Branch: refs/heads/master
Commit: a569f38f16186518b53461842d37b09fb1df45e9
Parents: 627bcda
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 24 18:03:49 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:24 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   | 241 ++++++++++++++
 .../apache/flink/runtime/instance/Instance.java |  27 +-
 .../flink/runtime/instance/SharedSlot.java      | 106 +++---
 .../flink/runtime/instance/SimpleSlot.java      |  60 ++--
 .../org/apache/flink/runtime/instance/Slot.java |  51 ++-
 .../apache/flink/runtime/instance/SlotPool.java | 323 ++++++++++---------
 .../flink/runtime/instance/SlotPoolGateway.java |  18 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |   6 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java | 169 ----------
 .../jobmanager/slots/SimpleSlotContext.java     |  68 ++++
 .../jobmanager/slots/SlotAndLocality.java       |   1 +
 .../runtime/jobmanager/slots/SlotContext.java   |  61 ++++
 .../runtime/jobmanager/slots/SlotException.java |  40 +++
 .../flink/runtime/jobmaster/JobMaster.java      |  24 +-
 .../runtime/jobmaster/JobMasterGateway.java     |   2 +-
 .../ExecutionGraphDeploymentTest.java           |  13 +-
 .../ExecutionGraphSchedulingTest.java           |  14 +-
 .../executiongraph/ExecutionGraphStopTest.java  |  10 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  14 +-
 .../runtime/executiongraph/ExecutionTest.java   |   8 +-
 .../ExecutionVertexCancelTest.java              |  44 +--
 .../ExecutionVertexDeploymentTest.java          |  18 +-
 .../ExecutionVertexLocalityTest.java            |   8 +-
 .../ExecutionVertexSchedulingTest.java          |   6 +-
 .../utils/SimpleSlotProvider.java               |  14 +-
 .../runtime/instance/AllocatedSlotsTest.java    |  75 +++--
 .../runtime/instance/AvailableSlotsTest.java    |  33 +-
 .../flink/runtime/instance/InstanceTest.java    |  39 ++-
 .../flink/runtime/instance/SharedSlotsTest.java |  34 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   5 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |  30 +-
 .../flink/runtime/instance/SlotPoolTest.java    | 116 +++----
 .../SlotSharingGroupAssignmentTest.java         |   3 -
 .../scheduler/CoLocationConstraintTest.java     |   8 +-
 .../jobmanager/slots/DummySlotOwner.java        |  33 ++
 .../taskexecutor/TaskExecutorITCase.java        |   6 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  10 +-
 37 files changed, 1031 insertions(+), 707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
new file mode 100644
index 0000000..7036044
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotException;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager.
+ * It represents a slice of allocated resources from the TaskManager.
+ * 
+ * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
+ * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the
+ * JobManager and notify the JobManager.
+ * 
+ * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
+ * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
+ * JobManager. All slots had a default unknown resource profile. 
+ */
+public class AllocatedSlot implements SlotContext {
+
+	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
+	private final AllocationID slotAllocationId;
+
+	/** The location information of the TaskManager to which this slot belongs */
+	private final TaskManagerLocation taskManagerLocation;
+
+	/** The resource profile of the slot provides */
+	private final ResourceProfile resourceProfile;
+
+	/** RPC gateway to call the TaskManager that holds this slot */
+	private final TaskManagerGateway taskManagerGateway;
+
+	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
+	private final int physicalSlotNumber;
+
+	private final SlotOwner slotOwner;
+
+	private final AtomicReference<LogicalSlot> logicalSlotReference;
+
+	// ------------------------------------------------------------------------
+
+	public AllocatedSlot(
+			AllocationID slotAllocationId,
+			TaskManagerLocation location,
+			int physicalSlotNumber,
+			ResourceProfile resourceProfile,
+			TaskManagerGateway taskManagerGateway,
+			SlotOwner slotOwner) {
+		this.slotAllocationId = checkNotNull(slotAllocationId);
+		this.taskManagerLocation = checkNotNull(location);
+		this.physicalSlotNumber = physicalSlotNumber;
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
+		this.slotOwner = checkNotNull(slotOwner);
+
+		logicalSlotReference = new AtomicReference<>(null);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 * 
+	 * @return The ID under which the slot is allocated
+	 */
+	public AllocationID getAllocationId() {
+		return slotAllocationId;
+	}
+
+	/**
+	 * Gets the ID of the TaskManager on which this slot was allocated.
+	 * 
+	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+	 * 
+	 * @return This slot's TaskManager's ID.
+	 */
+	public ResourceID getTaskManagerId() {
+		return getTaskManagerLocation().getResourceID();
+	}
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	public int getPhysicalSlotNumber() {
+		return physicalSlotNumber;
+	}
+
+	/**
+	 * Gets the resource profile of the slot.
+	 *
+	 * @return The resource profile of the slot.
+	 */
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The actor gateway that can be used to send messages to the TaskManager.
+	 */
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+
+	/**
+	 * Triggers the release of the logical slot.
+	 */
+	public void triggerLogicalSlotRelease() {
+		final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+		if (logicalSlot != null) {
+			logicalSlot.releaseSlot();
+		}
+	}
+
+	/**
+	 * Releases the logical slot.
+	 *
+	 * @return true if the logical slot could be released, false otherwise.
+	 */
+	public boolean releaseLogicalSlot() {
+		final LogicalSlot logicalSlot = logicalSlotReference.get();
+
+		if (logicalSlot != null) {
+			if (logicalSlot instanceof Slot) {
+				final Slot slot = (Slot) logicalSlot;
+				if (slot.markReleased()) {
+					logicalSlotReference.set(null);
+					return true;
+				}
+			} else {
+				throw new RuntimeException("Unsupported logical slot type encountered " + logicalSlot.getClass());
+			}
+
+		}
+
+		return false;
+	}
+
+	/**
+	 * Allocates a logical {@link SimpleSlot}.
+	 *
+	 * @return an allocated logical simple slot
+	 * @throws SlotException if we could not allocate a simple slot
+	 */
+	public SimpleSlot allocateSimpleSlot(Locality locality) throws SlotException {
+
+		final SimpleSlot simpleSlot = new SimpleSlot(this, slotOwner, physicalSlotNumber);
+
+		if (logicalSlotReference.compareAndSet(null, simpleSlot)) {
+			simpleSlot.setLocality(locality);
+			return simpleSlot;
+		} else {
+			throw new SlotException("Could not allocate logical simple slot because the allocated slot is already used.");
+		}
+	}
+
+	/**
+	 * Allocates a logical {@link SharedSlot}.
+	 *
+	 * @param slotSharingGroupAssignment the slot sharing group to which the shared slot shall belong
+	 * @return an allocated logical shared slot
+	 * @throws SlotException if we could not allocate a shared slot
+	 */
+	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment slotSharingGroupAssignment) throws SlotException {
+		final SharedSlot sharedSlot = new SharedSlot(this, slotOwner, slotSharingGroupAssignment);
+
+		if (logicalSlotReference.compareAndSet(null, sharedSlot)) {
+
+
+			return sharedSlot;
+		} else {
+			throw new SlotException("Could not allocate logical shared slot because the allocated slot is already used.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This always returns a reference hash code.
+	 */
+	@Override
+	public final int hashCode() {
+		return super.hashCode();
+	}
+
+	/**
+	 * This always checks based on reference equality.
+	 */
+	@Override
+	public final boolean equals(Object obj) {
+		return this == obj;
+	}
+
+	@Override
+	public String toString() {
+		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + physicalSlotNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index d099f6a..54c8971 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -210,19 +209,13 @@ public class Instance implements SlotOwner {
 	 * Allocates a simple slot on this TaskManager instance. This method returns {@code null}, if no slot
 	 * is available at the moment.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
-	 *
 	 * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the
 	 *         TaskManager instance has no more slots available.
 	 *
 	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the
 	 *                               slot is allocated. 
 	 */
-	public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
-		if (jobID == null) {
-			throw new IllegalArgumentException();
-		}
-
+	public SimpleSlot allocateSimpleSlot() throws InstanceDiedException {
 		synchronized (instanceLock) {
 			if (isDead) {
 				throw new InstanceDiedException(this);
@@ -233,7 +226,7 @@ public class Instance implements SlotOwner {
 				return null;
 			}
 			else {
-				SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway);
+				SimpleSlot slot = new SimpleSlot(this, location, nextSlot, taskManagerGateway);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -244,7 +237,6 @@ public class Instance implements SlotOwner {
 	 * Allocates a shared slot on this TaskManager instance. This method returns {@code null}, if no slot
 	 * is available at the moment. The shared slot will be managed by the given  SlotSharingGroupAssignment.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param sharingGroupAssignment The assignment group that manages this shared slot.
 	 *
 	 * @return A shared slot that represents a task slot on this TaskManager instance and can hold other
@@ -252,13 +244,8 @@ public class Instance implements SlotOwner {
 	 *
 	 * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the slot is allocated. 
 	 */
-	public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sharingGroupAssignment)
-			throws InstanceDiedException
-	{
-		// the slot needs to be in the returned to taskManager state
-		if (jobID == null) {
-			throw new IllegalArgumentException();
-		}
+	public SharedSlot allocateSharedSlot(SlotSharingGroupAssignment sharingGroupAssignment)
+			throws InstanceDiedException {
 
 		synchronized (instanceLock) {
 			if (isDead) {
@@ -271,7 +258,11 @@ public class Instance implements SlotOwner {
 			}
 			else {
 				SharedSlot slot = new SharedSlot(
-						jobID, this, location, nextSlot, taskManagerGateway, sharingGroupAssignment);
+					this,
+					location,
+					nextSlot,
+					taskManagerGateway,
+					sharingGroupAssignment);
 				allocatedSlots.add(slot);
 				return slot;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index 2ce4fc3..8637159 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
+
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -44,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * passed through a {@link SlotSharingGroupAssignment} object which is responsible for
  * synchronization.
  */
-public class SharedSlot extends Slot {
+public class SharedSlot extends Slot implements LogicalSlot {
 
 	/** The assignment group os shared slots that manages the availability and release of the slots */
 	private final SlotSharingGroupAssignment assignmentGroup;
@@ -52,6 +55,8 @@ public class SharedSlot extends Slot {
 	/** The set os sub-slots allocated from this shared slot */
 	private final Set<Slot> subSlots;
 
+	private final CompletableFuture<?> cancellationFuture = new CompletableFuture<>();
+
 	// ------------------------------------------------------------------------
 	//  Old Constructors (prior FLIP-6)
 	// ------------------------------------------------------------------------
@@ -60,7 +65,6 @@ public class SharedSlot extends Slot {
 	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
 	 * This constructor is used to create a slot directly from an instance. 
 	 *
-	 * @param jobID The ID of the job that the slot is created for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
@@ -68,18 +72,17 @@ public class SharedSlot extends Slot {
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
 	public SharedSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner, TaskManagerLocation location, int slotNumber,
 			TaskManagerGateway taskManagerGateway,
 			SlotSharingGroupAssignment assignmentGroup) {
 
-		this(jobID, owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
+		this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
 	}
 
 	/**
 	 * Creates a new shared slot that has is a sub-slot of the given parent shared slot, and that belongs
 	 * to the given task group.
 	 *
-	 * @param jobID The ID of the job that the slot is created for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
@@ -89,7 +92,6 @@ public class SharedSlot extends Slot {
 	 * @param groupId The assignment group of this slot.
 	 */
 	public SharedSlot(
-			JobID jobID,
 			SlotOwner owner,
 			TaskManagerLocation location,
 			int slotNumber,
@@ -98,7 +100,7 @@ public class SharedSlot extends Slot {
 			@Nullable SharedSlot parent,
 			@Nullable AbstractID groupId) {
 
-		super(jobID, owner, location, slotNumber, taskManagerGateway, parent, groupId);
+		super(owner, location, slotNumber, taskManagerGateway, parent, groupId);
 
 		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
@@ -112,38 +114,23 @@ public class SharedSlot extends Slot {
 	 * Creates a new shared slot that has no parent (is a root slot) and does not belong to any task group.
 	 * This constructor is used to create a slot directly from an instance.
 	 * 
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this shared slot
 	 * @param owner The component from which this slot is allocated.
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
-	public SharedSlot(AllocatedSlot allocatedSlot, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
-		this(allocatedSlot, owner, allocatedSlot.getSlotNumber(), assignmentGroup, null, null);
-	}
-
-	/**
-	 * Creates a new shared slot that is a sub-slot of the given parent shared slot, and that belongs
-	 * to the given task group.
-	 * 
-	 * @param parent The parent slot of this slot.
-	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the slot.
-	 * @param assignmentGroup The assignment group that this shared slot belongs to.
-	 * @param groupId The assignment group of this slot.
-	 */
-	public SharedSlot(
-			SharedSlot parent, SlotOwner owner, int slotNumber,
-			SlotSharingGroupAssignment assignmentGroup,
-			AbstractID groupId) {
-
-		this(parent.getAllocatedSlot(), owner, slotNumber, assignmentGroup, parent, groupId);
+	public SharedSlot(SlotContext slotContext, SlotOwner owner, SlotSharingGroupAssignment assignmentGroup) {
+		this(slotContext, owner, slotContext.getPhysicalSlotNumber(), assignmentGroup, null, null);
 	}
 
 	private SharedSlot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
+			SlotContext slotInformation,
+			SlotOwner owner,
+			int slotNumber,
 			SlotSharingGroupAssignment assignmentGroup,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupId) {
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupId) {
 
-		super(allocatedSlot, owner, slotNumber, parent, groupId);
+		super(slotInformation, owner, slotNumber, parent, groupId);
 
 		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
@@ -186,14 +173,44 @@ public class SharedSlot extends Slot {
 	public boolean hasChildren() {
 		return subSlots.size() > 0;
 	}
-	
+
 	@Override
-	public void releaseInstanceSlot() {
+	public boolean tryAssignPayload(Payload payload) {
+		throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
+	}
+
+	@Nullable
+	@Override
+	public Payload getPayload() {
+		return null;
+	}
+
+	@Override
+	public CompletableFuture<?> releaseSlot() {
+		cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released."));
+
 		assignmentGroup.releaseSharedSlot(this);
-		
+
 		if (!(isReleased() && subSlots.isEmpty())) {
 			throw new IllegalStateException("Bug: SharedSlot is not empty and released after call to releaseSlot()");
 		}
+
+		return CompletableFuture.completedFuture(null);
+	}
+
+	@Override
+	public void releaseInstanceSlot() {
+		releaseSlot();
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return getRootSlotNumber();
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return getSlotContext().getAllocationId();
 	}
 
 	/**
@@ -222,8 +239,12 @@ public class SharedSlot extends Slot {
 	SimpleSlot allocateSubSlot(AbstractID groupId) {
 		if (isAlive()) {
 			SimpleSlot slot = new SimpleSlot(
-					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerGateway(), this, groupId);
+				getOwner(),
+				getTaskManagerLocation(),
+				subSlots.size(),
+				getTaskManagerGateway(),
+				this,
+				groupId);
 			subSlots.add(slot);
 			return slot;
 		}
@@ -244,8 +265,13 @@ public class SharedSlot extends Slot {
 	SharedSlot allocateSharedSlot(AbstractID groupId){
 		if (isAlive()) {
 			SharedSlot slot = new SharedSlot(
-					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
-					getTaskManagerGateway(), assignmentGroup, this, groupId);
+				getOwner(),
+				getTaskManagerLocation(),
+				subSlots.size(),
+				getTaskManagerGateway(),
+				assignmentGroup,
+				this,
+				groupId);
 			subSlots.add(slot);
 			return slot;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 0c9e11c..d397c08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -64,23 +63,21 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	/**
 	 * Creates a new simple slot that stands alone and does not belong to shared slot.
 	 * 
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the task slot on the instance.
 	 * @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
 	 */
 	public SimpleSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner, TaskManagerLocation location, int slotNumber,
 			TaskManagerGateway taskManagerGateway) {
-		this(jobID, owner, location, slotNumber, taskManagerGateway, null, null);
+		this(owner, location, slotNumber, taskManagerGateway, null, null);
 	}
 
 	/**
 	 * Creates a new simple slot that belongs to the given shared slot and
 	 * is identified by the given ID.
 	 *
-	 * @param jobID The ID of the job that the slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
@@ -89,15 +86,25 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	public SimpleSlot(
-			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			SlotOwner owner,
+			TaskManagerLocation location,
+			int slotNumber,
 			TaskManagerGateway taskManagerGateway,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-
-		super(parent != null ?
-				parent.getAllocatedSlot() :
-				new AllocatedSlot(NO_ALLOCATION_ID, jobID, location, slotNumber,
-						ResourceProfile.UNKNOWN, taskManagerGateway),
-				owner, slotNumber, parent, groupID);
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
+
+		super(
+			parent != null ?
+				parent.getSlotContext() :
+				new SimpleSlotContext(
+					NO_ALLOCATION_ID,
+					location,
+					slotNumber,
+					taskManagerGateway),
+			owner,
+			slotNumber,
+			parent,
+			groupID);
 	}
 
 	// ------------------------------------------------------------------------
@@ -107,12 +114,11 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	/**
 	 * Creates a new simple slot that stands alone and does not belong to shared slot.
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this simple slot
 	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the task slot on the instance.
 	 */
-	public SimpleSlot(AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber) {
-		this(allocatedSlot, owner, slotNumber, null, null);
+	public SimpleSlot(SlotContext slotContext, SlotOwner owner, int slotNumber) {
+		this(slotContext, owner, slotNumber, null, null);
 	}
 
 	/**
@@ -121,27 +127,29 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 	 *
 	 * @param parent The parent shared slot.
 	 * @param owner The component from which this slot is allocated.
-	 * @param slotNumber The number of the simple slot in its parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	public SimpleSlot(SharedSlot parent, SlotOwner owner, int slotNumber, AbstractID groupID) {
-		this(parent.getAllocatedSlot(), owner, slotNumber, parent, groupID);
+		this(parent.getSlotContext(), owner, slotNumber, parent, groupID);
 	}
 	
 	/**
 	 * Creates a new simple slot that belongs to the given shared slot and
 	 * is identified by the given ID..
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this simple slot
 	 * @param owner The component from which this slot is allocated.
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
 	 * @param parent The parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
 	private SimpleSlot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-		super(allocatedSlot, owner, slotNumber, parent, groupID);
+			SlotContext slotContext,
+			SlotOwner owner,
+			int slotNumber,
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
+		super(slotContext, owner, slotNumber, parent, groupID);
 	}
 
 	// ------------------------------------------------------------------------
@@ -263,7 +271,7 @@ public class SimpleSlot extends Slot implements LogicalSlot {
 
 	@Override
 	public AllocationID getAllocationId() {
-		return getAllocatedSlot().getSlotAllocationId();
+		return getSlotContext().getAllocationId();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 804682b..6262c9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -66,8 +65,8 @@ public abstract class Slot {
 
 	// ------------------------------------------------------------------------
 
-	/** The allocated slot that this slot represents. */
-	private final AllocatedSlot allocatedSlot;
+	/** Context of this logical slot. */
+	private final SlotContext slotContext;
 
 	/** The owner of this slot - the slot was taken from that owner and must be disposed to it */
 	private final SlotOwner owner;
@@ -80,7 +79,6 @@ public abstract class Slot {
 	@Nullable
 	private final AbstractID groupID;
 
-	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 
 	/** The state of the vertex, only atomically updated */
@@ -93,7 +91,6 @@ public abstract class Slot {
 	 * 
 	 * <p>This is the old way of constructing slots, prior to the FLIP-6 resource management refactoring.
 	 * 
-	 * @param jobID The ID of the job that this slot is allocated for.
 	 * @param owner The component from which this slot is allocated.
 	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of this slot.
@@ -103,7 +100,6 @@ public abstract class Slot {
 	 *                if the slot does not belong to any task group.   
 	 */
 	protected Slot(
-			JobID jobID,
 			SlotOwner owner,
 			TaskManagerLocation location,
 			int slotNumber,
@@ -113,12 +109,11 @@ public abstract class Slot {
 
 		checkArgument(slotNumber >= 0);
 
-		this.allocatedSlot = new AllocatedSlot(
+		// create a simple slot context
+		this.slotContext = new SimpleSlotContext(
 			NO_ALLOCATION_ID,
-			jobID,
 			location,
 			slotNumber,
-			ResourceProfile.UNKNOWN,
 			taskManagerGateway);
 
 		this.owner = checkNotNull(owner);
@@ -130,7 +125,7 @@ public abstract class Slot {
 	/**
 	 * Base constructor for slots.
 	 *
-	 * @param allocatedSlot The allocated slot that this slot represents.
+	 * @param slotContext The slot context of this slot.
 	 * @param owner The component from which this slot is allocated.
 	 * @param slotNumber The number of this slot.
 	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
@@ -138,12 +133,13 @@ public abstract class Slot {
 	 *                if the slot does not belong to any task group.   
 	 */
 	protected Slot(
-			AllocatedSlot allocatedSlot, SlotOwner owner, int slotNumber,
-			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
-
-		checkArgument(slotNumber >= 0);
+			SlotContext slotContext,
+			SlotOwner owner,
+			int slotNumber,
+			@Nullable SharedSlot parent,
+			@Nullable AbstractID groupID) {
 
-		this.allocatedSlot = checkNotNull(allocatedSlot);
+		this.slotContext = checkNotNull(slotContext);
 		this.owner = checkNotNull(owner);
 		this.parent = parent; // may be null
 		this.groupID = groupID; // may be null
@@ -157,17 +153,8 @@ public abstract class Slot {
 	 * 
 	 * @return This slot's allocated slot.
 	 */
-	public AllocatedSlot getAllocatedSlot() {
-		return allocatedSlot;
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 *
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return allocatedSlot.getJobID();
+	public SlotContext getSlotContext() {
+		return slotContext;
 	}
 
 	/**
@@ -176,7 +163,7 @@ public abstract class Slot {
 	 * @return The ID of the TaskManager that offers this slot
 	 */
 	public ResourceID getTaskManagerID() {
-		return allocatedSlot.getTaskManagerLocation().getResourceID();
+		return slotContext.getTaskManagerLocation().getResourceID();
 	}
 
 	/**
@@ -185,7 +172,7 @@ public abstract class Slot {
 	 * @return The location info of the TaskManager that offers this slot
 	 */
 	public TaskManagerLocation getTaskManagerLocation() {
-		return allocatedSlot.getTaskManagerLocation();
+		return slotContext.getTaskManagerLocation();
 	}
 
 	/**
@@ -196,7 +183,7 @@ public abstract class Slot {
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
 	public TaskManagerGateway getTaskManagerGateway() {
-		return allocatedSlot.getTaskManagerGateway();
+		return slotContext.getTaskManagerGateway();
 	}
 
 	/**
@@ -373,7 +360,7 @@ public abstract class Slot {
 	}
 
 	protected String hierarchy() {
-		return (getParent() != null ? getParent().hierarchy() : "") + '(' + slotNumber + ')';
+		return (getParent() != null ? getParent().hierarchy() : "") + '(' + getSlotNumber() + ')';
 	}
 
 	private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 771d690..2ccea75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -29,9 +28,11 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.SlotException;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -59,11 +60,11 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -249,7 +250,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		// work on all slots waiting for this connection
 		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
-			requestSlotFromResourceManager(pendingRequest);
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
 		}
 
 		// all sent off
@@ -277,24 +278,23 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public void returnAllocatedSlot(Slot slot) {
-		internalReturnAllocatedSlot(slot);
+	public void returnAllocatedSlot(SlotContext allocatedSlot) {
+		internalReturnAllocatedSlot(allocatedSlot.getAllocationId());
 	}
 
 	@Override
-	public CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId) {
+	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId) {
 		final PendingRequest pendingRequest = removePendingRequest(requestId);
 
 		if (pendingRequest != null) {
 			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + requestId + " cancelled."));
 		} else {
-			final Slot slot = allocatedSlots.get(requestId);
+			final AllocatedSlot allocatedSlot = allocatedSlots.get(requestId);
 
-			if (slot != null) {
-				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", slot, requestId);
-				if (slot.markCancelled()) {
-					internalReturnAllocatedSlot(slot);
-				}
+			if (allocatedSlot != null) {
+				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, requestId);
+				// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
+				allocatedSlot.triggerLogicalSlotRelease();
 			} else {
 				LOG.debug("There was no slot allocation with {} to be cancelled.", requestId);
 			}
@@ -312,24 +312,36 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		// (1) do we have a slot available already?
 		SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
 		if (slotFromPool != null) {
-			SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
-			allocatedSlots.add(requestId, slot);
-			return CompletableFuture.completedFuture(slot);
-		}
+			final AllocatedSlot allocatedSlot = slotFromPool.slot();
 
-		// the request will be completed by a future
-		final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
+			final SimpleSlot simpleSlot;
+			try {
+				simpleSlot = allocatedSlot.allocateSimpleSlot(slotFromPool.locality());
+			} catch (SlotException e) {
+				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 
-		// (2) need to request a slot
-		if (resourceManagerGateway == null) {
-			// no slot available, and no resource manager connection
-			stashRequestWaitingForResourceManager(requestId, resources, future);
-		} else {
-			// we have a resource manager connection, so let's ask it for more resources
-			requestSlotFromResourceManager(new PendingRequest(requestId, future, resources));
+				return FutureUtils.completedExceptionally(e);
+			}
+
+			allocatedSlots.add(requestId, allocatedSlot);
+			return CompletableFuture.completedFuture(simpleSlot);
 		}
 
-		return future;
+		// we have to request a new allocated slot
+		CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestSlot(
+			requestId,
+			resources);
+
+		return allocatedSlotFuture.thenApply(
+			(AllocatedSlot allocatedSlot) -> {
+				try {
+					return allocatedSlot.allocateSimpleSlot(Locality.UNKNOWN);
+				} catch (SlotException e) {
+					returnAllocatedSlot(allocatedSlot);
+
+					throw new CompletionException("Could not allocate a logical simple slot.", e);
+				}
+			});
 	}
 
 	/**
@@ -354,16 +366,37 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 	}
 
+	private CompletableFuture<AllocatedSlot> requestSlot(
+		SlotRequestID slotRequestId,
+		ResourceProfile resourceProfile) {
+
+		final PendingRequest pendingRequest = new PendingRequest(
+			slotRequestId,
+			resourceProfile);
+
+		if (resourceManagerGateway == null) {
+			stashRequestWaitingForResourceManager(pendingRequest);
+		} else {
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
+		}
+
+		return pendingRequest.getAllocatedSlotFuture();
+	}
+
 	private void requestSlotFromResourceManager(
+			final ResourceManagerGateway resourceManagerGateway,
 			final PendingRequest pendingRequest) {
 
+		Preconditions.checkNotNull(resourceManagerGateway);
+		Preconditions.checkNotNull(pendingRequest);
+
 		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
 
 		final AllocationID allocationId = new AllocationID();
 
 		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
 
-		pendingRequest.getFuture().whenComplete(
+		pendingRequest.getAllocatedSlotFuture().whenComplete(
 			(value, throwable) -> {
 				if (throwable != null) {
 					resourceManagerGateway.cancelSlotRequest(allocationId);
@@ -405,7 +438,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) {
 		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
 		if (request != null) {
-			request.getFuture().completeExceptionally(new NoResourceAvailableException(
+			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
 					"No pooled slot available and request to ResourceManager for new slot failed", failure));
 		} else {
 			if (LOG.isDebugEnabled()) {
@@ -425,25 +458,22 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		Preconditions.checkNotNull(pendingRequest);
 		Preconditions.checkNotNull(e);
 
-		if (!pendingRequest.getFuture().isDone()) {
-			pendingRequest.getFuture().completeExceptionally(e);
+		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
+			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
 
-	private void stashRequestWaitingForResourceManager(
-			final SlotRequestID requestId,
-			final ResourceProfile resources,
-			final CompletableFuture<LogicalSlot> future) {
+	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
 
 		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
-				"Adding as pending request {}",  requestId);
+				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
 
-		waitingForResourceManager.put(requestId, new PendingRequest(requestId, future, resources));
+		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
 
 		scheduleRunAsync(new Runnable() {
 			@Override
 			public void run() {
-				checkTimeoutRequestWaitingForResourceManager(requestId);
+				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
 			}
 		}, resourceManagerRequestsTimeout);
 	}
@@ -465,38 +495,31 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
 	 * slot can be reused by other pending requests if the resource profile matches.n
 	 *
-	 * @param slot The slot needs to be returned
+	 * @param allocationId identifying the slot which is returned
 	 */
-	private void internalReturnAllocatedSlot(Slot slot) {
-		checkNotNull(slot);
-		checkArgument(!slot.isAlive(), "slot is still alive");
-		checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool.");
-
-		// markReleased() is an atomic check-and-set operation, so that the slot is guaranteed
-		// to be returned only once
-		if (slot.markReleased()) {
-			if (allocatedSlots.remove(slot)) {
-				// this slot allocation is still valid, use the slot to fulfill another request
-				// or make it available again
-				final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot();
-				final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot);
-	
+	private void internalReturnAllocatedSlot(AllocationID allocationId) {
+		final AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationId);
+
+		if (allocatedSlot != null) {
+			if (allocatedSlot.releaseLogicalSlot()) {
+
+				final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+
 				if (pendingRequest != null) {
 					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-							pendingRequest.getSlotRequestId(), taskManagerSlot.getSlotAllocationId());
+						pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-					SimpleSlot newSlot = createSimpleSlot(taskManagerSlot, Locality.UNKNOWN);
-					allocatedSlots.add(pendingRequest.getSlotRequestId(), newSlot);
-					pendingRequest.getFuture().complete(newSlot);
+					allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+					pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
+				} else {
+					LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+					availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 				}
-				else {
-					LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
-					availableSlots.add(taskManagerSlot, clock.relativeTimeMillis());
-				}
-			}
-			else {
-				LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
+			} else {
+				LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
 			}
+		} else {
+			LOG.debug("Could not find allocated slot {}. Ignoring returning slot.", allocationId);
 		}
 	}
 
@@ -524,19 +547,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	@Override
-	public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) {
+	public CompletableFuture<Collection<SlotOffer>> offerSlots(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			Collection<SlotOffer> offers) {
 		validateRunsInMainThread();
 
 		List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
 			offer -> {
-				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(offer.f0).thenApply(
-					(acceptedSlot) -> {
-						if (acceptedSlot) {
-							return Optional.of(offer.f1);
-						} else {
-							return Optional.empty();
-						}
-					});
+				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
+					taskManagerLocation,
+					taskManagerGateway,
+					offer)
+					.thenApply(
+						(acceptedSlot) -> {
+							if (acceptedSlot) {
+								return Optional.of(offer);
+							} else {
+								return Optional.empty();
+							}
+						});
 
 				return acceptedSlotOffer;
 			}
@@ -564,20 +594,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
 	 * request waiting for this slot (maybe fulfilled by some other returned slot).
 	 *
-	 * @param slot The offered slot
+	 * @param taskManagerLocation location from where the offer comes from
+	 * @param taskManagerGateway TaskManager gateway
+	 * @param slotOffer the offered slot
 	 * @return True if we accept the offering
 	 */
 	@Override
-	public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) {
+	public CompletableFuture<Boolean> offerSlot(
+			final TaskManagerLocation taskManagerLocation,
+			final TaskManagerGateway taskManagerGateway,
+			final SlotOffer slotOffer) {
 		validateRunsInMainThread();
 
 		// check if this TaskManager is valid
-		final ResourceID resourceID = slot.getTaskManagerId();
-		final AllocationID allocationID = slot.getSlotAllocationId();
+		final ResourceID resourceID = taskManagerLocation.getResourceID();
+		final AllocationID allocationID = slotOffer.getAllocationId();
 
 		if (!registeredTaskManagers.contains(resourceID)) {
 			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
-					slot.getSlotAllocationId(), slot);
+					slotOffer.getAllocationId(), taskManagerLocation);
 			return CompletableFuture.completedFuture(false);
 		}
 
@@ -590,19 +625,26 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			return CompletableFuture.completedFuture(true);
 		}
 
+		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+			slotOffer.getAllocationId(),
+			taskManagerLocation,
+			slotOffer.getSlotIndex(),
+			slotOffer.getResourceProfile(),
+			taskManagerGateway,
+			providerAndOwner);
+
 		// check whether we have request waiting for this slot
 		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
 		if (pendingRequest != null) {
 			// we were waiting for this!
-			SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
-			pendingRequest.getFuture().complete(resultSlot);
-			allocatedSlots.add(pendingRequest.getSlotRequestId(), resultSlot);
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		}
 		else {
 			// we were actually not waiting for this:
 			//   - could be that this request had been fulfilled
 			//   - we are receiving the slots from TaskManagers after becoming leaders
-			availableSlots.add(slot, clock.relativeTimeMillis());
+			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 
 		// we accepted the request in any case. slot will be released after it idled for
@@ -639,11 +681,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
 		}
 		else {
-			Slot slot = allocatedSlots.remove(allocationID);
-			if (slot != null) {
+			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
+			if (allocatedSlot != null) {
 				// release the slot.
 				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				slot.releaseInstanceSlot();
+				allocatedSlot.triggerLogicalSlotRelease();
 			}
 			else {
 				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
@@ -681,9 +723,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		if (registeredTaskManagers.remove(resourceID)) {
 			availableSlots.removeAllForTaskManager(resourceID);
 
-			final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
-			for (Slot slot : allocatedSlotsForResource) {
-				slot.releaseInstanceSlot();
+			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
+				allocatedSlot.triggerLogicalSlotRelease();
+				// TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests
+				allocatedSlot.releaseLogicalSlot();
 			}
 		}
 
@@ -691,18 +735,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
-		SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber());
-		if (locality != null) {
-			result.setLocality(locality);
-		}
-		return result;
-	}
-
-	// ------------------------------------------------------------------------
 	//  Methods for tests
 	// ------------------------------------------------------------------------
 
@@ -736,10 +768,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	static class AllocatedSlots {
 
 		/** All allocated slots organized by TaskManager's id */
-		private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
+		private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
 
 		/** All allocated slots organized by AllocationID */
-		private final DualKeyMap<AllocationID, SlotRequestID, Slot> allocatedSlotsById;
+		private final DualKeyMap<AllocationID, SlotRequestID, AllocatedSlot> allocatedSlotsById;
 
 		AllocatedSlots() {
 			this.allocatedSlotsByTaskManager = new HashMap<>(16);
@@ -749,18 +781,18 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		/**
 		 * Adds a new slot to this collection.
 		 *
-		 * @param slot The allocated slot
+		 * @param allocatedSlot The allocated slot
 		 */
-		void add(SlotRequestID slotRequestId, Slot slot) {
-			allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slotRequestId, slot);
-
-			final ResourceID resourceID = slot.getTaskManagerID();
-			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID);
-			if (slotsForTaskManager == null) {
-				slotsForTaskManager = new HashSet<>();
-				allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager);
-			}
-			slotsForTaskManager.add(slot);
+		void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) {
+			allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
+
+			final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
+
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
+				resourceID,
+				resourceId -> new HashSet<>(4));
+
+			slotsForTaskManager.add(allocatedSlot);
 		}
 
 		/**
@@ -769,11 +801,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		 * @param allocationID The allocation id
 		 * @return The allocated slot, null if we can't find a match
 		 */
-		Slot get(final AllocationID allocationID) {
+		AllocatedSlot get(final AllocationID allocationID) {
 			return allocatedSlotsById.getKeyA(allocationID);
 		}
 
-		Slot get(final SlotRequestID slotRequestId) {
+		AllocatedSlot get(final SlotRequestID slotRequestId) {
 			return allocatedSlotsById.getKeyB(slotRequestId);
 		}
 
@@ -790,27 +822,20 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		/**
 		 * Remove an allocation with slot.
 		 *
-		 * @param slot The slot needs to be removed
-		 */
-		boolean remove(final Slot slot) {
-			return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
-		}
-
-		/**
-		 * Remove an allocation with slot.
-		 *
 		 * @param slotId The ID of the slot to be removed
 		 */
-		Slot remove(final AllocationID slotId) {
-			Slot slot = allocatedSlotsById.removeKeyA(slotId);
-			if (slot != null) {
-				final ResourceID taskManagerId = slot.getTaskManagerID();
-				Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
-				slotsForTM.remove(slot);
+		AllocatedSlot remove(final AllocationID slotId) {
+			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(slotId);
+			if (allocatedSlot != null) {
+				final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
+				Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+
+				slotsForTM.remove(allocatedSlot);
+
 				if (slotsForTM.isEmpty()) {
 					allocatedSlotsByTaskManager.remove(taskManagerId);
 				}
-				return slot;
+				return allocatedSlot;
 			}
 			else {
 				return null;
@@ -823,11 +848,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		 * @param resourceID The id of the TaskManager
 		 * @return Set of slots which are allocated from the same TaskManager
 		 */
-		Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) {
-			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
+		Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
 			if (slotsForTaskManager != null) {
-				for (Slot slot : slotsForTaskManager) {
-					allocatedSlotsById.removeKeyA(slot.getAllocatedSlot().getSlotAllocationId());
+				for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
+					allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
 				}
 				return slotsForTaskManager;
 			}
@@ -852,7 +877,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@VisibleForTesting
-		Set<Slot> getSlotsForTaskManager(ResourceID resourceId) {
+		Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
 			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
 				return allocatedSlotsByTaskManager.get(resourceId);
 			} else {
@@ -892,7 +917,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			checkNotNull(slot);
 
 			SlotAndTimestamp previous = availableSlots.put(
-					slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp));
+					slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
 
 			if (previous == null) {
 				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
@@ -951,7 +976,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 					if (onTaskManager != null) {
 						for (AllocatedSlot candidate : onTaskManager) {
 							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getSlotAllocationId());
+								remove(candidate.getAllocationId());
 								return new SlotAndLocality(candidate, Locality.LOCAL);
 							}
 						}
@@ -964,7 +989,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 					if (onHost != null) {
 						for (AllocatedSlot candidate : onHost) {
 							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getSlotAllocationId());
+								remove(candidate.getAllocationId());
 								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
 							}
 						}
@@ -977,7 +1002,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 				final AllocatedSlot slot = candidate.slot();
 
 				if (slot.getResourceProfile().isMatching(resourceProfile)) {
-					remove(slot.getSlotAllocationId());
+					remove(slot.getAllocationId());
 					return new SlotAndLocality(
 							slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
 				}
@@ -1002,7 +1027,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 				// remove from the base set and the by-host view
 				for (AllocatedSlot slot : slotsForTm) {
-					availableSlots.remove(slot.getSlotAllocationId());
+					availableSlots.remove(slot.getAllocationId());
 					slotsForHost.remove(slot);
 				}
 
@@ -1082,7 +1107,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		@Override
 		public CompletableFuture<Boolean> returnAllocatedSlot(Slot slot) {
-			gateway.returnAllocatedSlot(slot);
+			gateway.returnAllocatedSlot(slot.getSlotContext());
 			return CompletableFuture.completedFuture(true);
 		}
 
@@ -1097,7 +1122,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			slotFuture.whenComplete(
 				(LogicalSlot slot, Throwable failure) -> {
 					if (failure != null) {
-						gateway.cancelSlotAllocation(requestId);
+						gateway.cancelSlotRequest(requestId);
 					}
 			});
 			return slotFuture;
@@ -1113,25 +1138,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		private final SlotRequestID slotRequestId;
 
-		private final CompletableFuture<LogicalSlot> future;
-
 		private final ResourceProfile resourceProfile;
 
+		private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
+
 		PendingRequest(
 				SlotRequestID slotRequestId,
-				CompletableFuture<LogicalSlot> future,
 				ResourceProfile resourceProfile) {
 			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-			this.future = Preconditions.checkNotNull(future);
 			this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+
+			allocatedSlotFuture = new CompletableFuture<>();
 		}
 
 		public SlotRequestID getSlotRequestId() {
 			return slotRequestId;
 		}
 
-		public CompletableFuture<LogicalSlot> getFuture() {
-			return future;
+		public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
+			return allocatedSlotFuture;
 		}
 
 		public ResourceProfile getResourceProfile() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index ad2a6a6..71de054 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -76,9 +76,15 @@ public interface SlotPoolGateway extends RpcGateway {
 
 	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
 
-	CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
+	CompletableFuture<Boolean> offerSlot(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		SlotOffer slotOffer);
 
-	CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers);
+	CompletableFuture<Collection<SlotOffer>> offerSlots(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		Collection<SlotOffer> offers);
 	
 	void failAllocation(AllocationID allocationID, Exception cause);
 
@@ -93,7 +99,7 @@ public interface SlotPoolGateway extends RpcGateway {
 			Iterable<TaskManagerLocation> locationPreferences,
 			@RpcTimeout Time timeout);
 
-	void returnAllocatedSlot(Slot slot);
+	void returnAllocatedSlot(SlotContext slotInformation);
 
 	/**
 	 * Cancel a slot allocation request.
@@ -101,7 +107,7 @@ public interface SlotPoolGateway extends RpcGateway {
 	 * @param requestId identifying the slot allocation request
 	 * @return Future acknowledge if the slot allocation has been cancelled
 	 */
-	CompletableFuture<Acknowledge> cancelSlotAllocation(SlotRequestID requestId);
+	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID requestId);
 
 	/**
 	 * Request ID identifying different slot requests.

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 8857be7..a3c38e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -364,7 +364,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			Locality locality = instanceLocalityPair.getRight();
 
 			try {
-				SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
+				SimpleSlot slot = instanceToUse.allocateSimpleSlot();
 				
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
@@ -426,7 +426,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				JobVertexID groupID = vertex.getJobvertexId();
 				
 				// allocate a shared slot from the instance
-				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
+				SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(groupAssignment);
 
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
@@ -562,7 +562,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 				
 				try {
-					SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
+					SimpleSlot newSlot = instance.allocateSimpleSlot();
 					if (newSlot != null) {
 						
 						// success, remove from the task queue and notify the future

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
deleted file mode 100644
index 4910862..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The {@code AllocatedSlot} represents a slot that the JobManager allocated from a TaskManager.
- * It represents a slice of allocated resources from the TaskManager.
- * 
- * <p>To allocate an {@code AllocatedSlot}, the requests a slot from the ResourceManager. The
- * ResourceManager picks (or starts) a TaskManager that will then allocate the slot to the
- * JobManager and notify the JobManager.
- * 
- * <p>Note: Prior to the resource management changes introduced in (Flink Improvement Proposal 6),
- * an AllocatedSlot was allocated to the JobManager as soon as the TaskManager registered at the
- * JobManager. All slots had a default unknown resource profile. 
- */
-public class AllocatedSlot {
-
-	/** The ID under which the slot is allocated. Uniquely identifies the slot. */
-	private final AllocationID slotAllocationId;
-
-	/** The ID of the job this slot is allocated for */
-	private final JobID jobID;
-
-	/** The location information of the TaskManager to which this slot belongs */
-	private final TaskManagerLocation taskManagerLocation;
-
-	/** The resource profile of the slot provides */
-	private final ResourceProfile resourceProfile;
-
-	/** RPC gateway to call the TaskManager that holds this slot */
-	private final TaskManagerGateway taskManagerGateway;
-
-	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
-	private final int slotNumber;
-
-	// ------------------------------------------------------------------------
-
-	public AllocatedSlot(
-			AllocationID slotAllocationId,
-			JobID jobID,
-			TaskManagerLocation location,
-			int slotNumber,
-			ResourceProfile resourceProfile,
-			TaskManagerGateway taskManagerGateway) {
-		this.slotAllocationId = checkNotNull(slotAllocationId);
-		this.jobID = checkNotNull(jobID);
-		this.taskManagerLocation = checkNotNull(location);
-		this.slotNumber = slotNumber;
-		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerGateway = checkNotNull(taskManagerGateway);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
-	 * 
-	 * @return The ID under which the slot is allocated
-	 */
-	public AllocationID getSlotAllocationId() {
-		return slotAllocationId;
-	}
-
-	/**
-	 * Gets the ID of the TaskManager on which this slot was allocated.
-	 * 
-	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
-	 * 
-	 * @return This slot's TaskManager's ID.
-	 */
-	public ResourceID getTaskManagerId() {
-		return getTaskManagerLocation().getResourceID();
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 *
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	public int getSlotNumber() {
-		return slotNumber;
-	}
-
-	/**
-	 * Gets the resource profile of the slot.
-	 *
-	 * @return The resource profile of the slot.
-	 */
-	public ResourceProfile getResourceProfile() {
-		return resourceProfile;
-	}
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	public TaskManagerLocation getTaskManagerLocation() {
-		return taskManagerLocation;
-	}
-
-	/**
-	 * Gets the actor gateway that can be used to send messages to the TaskManager.
-	 * <p>
-	 * This method should be removed once the new interface-based RPC abstraction is in place
-	 *
-	 * @return The actor gateway that can be used to send messages to the TaskManager.
-	 */
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This always returns a reference hash code.
-	 */
-	@Override
-	public final int hashCode() {
-		return super.hashCode();
-	}
-
-	/**
-	 * This always checks based on reference equality.
-	 */
-	@Override
-	public final boolean equals(Object obj) {
-		return this == obj;
-	}
-
-	@Override
-	public String toString() {
-		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
new file mode 100644
index 0000000..5dccc1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Simple implementation of the {@link SlotContext} interface for the legacy code.
+ */
+public class SimpleSlotContext implements SlotContext {
+
+	private final AllocationID allocationId;
+
+	private final TaskManagerLocation taskManagerLocation;
+
+	private final int physicalSlotNumber;
+
+	private final TaskManagerGateway taskManagerGateway;
+
+	public SimpleSlotContext(
+			AllocationID allocationId,
+			TaskManagerLocation taskManagerLocation,
+			int physicalSlotNumber,
+			TaskManagerGateway taskManagerGateway) {
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+		this.physicalSlotNumber = physicalSlotNumber;
+		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+	}
+
+	@Override
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	@Override
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	@Override
+	public int getPhysicalSlotNumber() {
+		return physicalSlotNumber;
+	}
+
+	@Override
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
index 3fe5346..5ae057d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
new file mode 100644
index 0000000..d8a1aa4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Interface for the context of a logical {@link Slot}. This context contains information
+ * about the underlying allocated slot and how to communicate with the TaskManager on which
+ * it was allocated.
+ */
+public interface SlotContext {
+
+	/**
+	 * Gets the ID under which the slot is allocated, which uniquely identifies the slot.
+	 *
+	 * @return The ID under which the slot is allocated
+	 */
+	AllocationID getAllocationId();
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	TaskManagerLocation getTaskManagerLocation();
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	int getPhysicalSlotNumber();
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The gateway that can be used to send messages to the TaskManager.
+	 */
+	TaskManagerGateway getTaskManagerGateway();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
new file mode 100644
index 0000000..48e7e25
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for slot related exceptions.
+ */
+public class SlotException extends FlinkException {
+	private static final long serialVersionUID = -8009227041400667546L;
+
+	public SlotException(String message) {
+		super(message);
+	}
+
+	public SlotException(Throwable cause) {
+		super(cause);
+	}
+
+	public SlotException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 687b6d1..324557f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -65,7 +65,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -109,7 +108,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -649,7 +647,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	@Override
 	public CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
-			final Iterable<SlotOffer> slots,
+			final Collection<SlotOffer> slots,
 			final Time timeout) {
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
@@ -658,27 +656,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
 		}
 
-		final JobID jid = jobGraph.getJobID();
 		final TaskManagerLocation taskManagerLocation = taskManager.f0;
 		final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
 
-		final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
-
 		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
 
-		for (SlotOffer slotOffer : slots) {
-			final AllocatedSlot slot = new AllocatedSlot(
-				slotOffer.getAllocationId(),
-				jid,
-				taskManagerLocation,
-				slotOffer.getSlotIndex(),
-				slotOffer.getResourceProfile(),
-				rpcTaskManagerGateway);
-
-			slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
-		}
-
-		return slotPoolGateway.offerSlots(slotsAndOffers);
+		return slotPoolGateway.offerSlots(
+			taskManagerLocation,
+			rpcTaskManagerGateway,
+			slots);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index ad906c2..09d995e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -195,7 +195,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 */
 	CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
-			final Iterable<SlotOffer> slots,
+			final Collection<SlotOffer> slots,
 			@RpcTimeout final Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a569f38f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index b489478..16da8e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -184,7 +184,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 			final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
 
-			final SimpleSlot slot = instance.allocateSimpleSlot(jobId);
+			final SimpleSlot slot = instance.allocateSimpleSlot();
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
@@ -631,13 +631,13 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 
 		final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
 
-		final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+		final SimpleSlot sourceSlot1 = createSlot(localTaskManagerLocation, 0);
 
-		final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+		final SimpleSlot sourceSlot2 = createSlot(localTaskManagerLocation, 1);
 
-		final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+		final SimpleSlot sinkSlot1 = createSlot(localTaskManagerLocation, 0);
 
-		final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+		final SimpleSlot sinkSlot2 = createSlot(localTaskManagerLocation, 1);
 
 		slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
 		slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
@@ -654,9 +654,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 		}
 	}
 
-	private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) {
+	private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, int index) {
 		return new SimpleSlot(
-			jobId,
 			mock(SlotOwner.class),
 			taskManagerLocation,
 			index,