You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:24 UTC

[07/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
new file mode 100644
index 0000000..91ffa8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * <p>The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the task or the
+ * co-location constraint running in this slot.
+ *
+ * <p>The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which can contain
+ * a number of other {@link TaskSlot} and the latter class represents the leaf nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * <p>Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different
+ * task.
+ *
+ * <p>Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+	/** Lock for the internal data structures. */
+	private final Object lock = new Object();
+
+	private final SlotSharingGroupId slotSharingGroupId;
+
+	/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
+	private final AllocatedSlotActions allocatedSlotActions;
+
+	/** Owner of the slots to which to return them when they are released from the outside. */
+	private final SlotOwner slotOwner;
+
+	private final Map<SlotRequestId, TaskSlot> allTaskSlots;
+
+	/** Root nodes which have not been completed because the allocated slot is still pending. */
+	@GuardedBy("lock")
+	private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
+
+	/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
+	@GuardedBy("lock")
+	private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;
+
+	SlotSharingManager(
+			SlotSharingGroupId slotSharingGroupId,
+			AllocatedSlotActions allocatedSlotActions,
+			SlotOwner slotOwner) {
+		this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
+		this.allocatedSlotActions = Preconditions.checkNotNull(allocatedSlotActions);
+		this.slotOwner = Preconditions.checkNotNull(slotOwner);
+
+		allTaskSlots = new HashMap<>(16);
+		unresolvedRootSlots = new HashMap<>(16);
+		resolvedRootSlots = new HashMap<>(16);
+	}
+
+	public boolean isEmpty() {
+		return allTaskSlots.isEmpty();
+	}
+
+	public boolean contains(SlotRequestId slotRequestId) {
+		return allTaskSlots.containsKey(slotRequestId);
+	}
+
+	@Nullable
+	TaskSlot getTaskSlot(SlotRequestId slotRequestId) {
+		return allTaskSlots.get(slotRequestId);
+	}
+
+	/**
+	 * Creates a new root slot with the given {@link SlotRequestId}, {@link SlotContext} future and
+	 * the {@link SlotRequestId} of the allocated slot.
+	 *
+	 * @param slotRequestId of the root slot
+	 * @param slotContextFuture with which we create the root slot
+	 * @param allocatedSlotRequestId slot request id of the underlying allocated slot which can be used
+	 *                               to cancel the pending slot request or release the allocated slot
+	 * @return New root slot
+	 */
+	MultiTaskSlot createRootSlot(
+			SlotRequestId slotRequestId,
+			CompletableFuture<? extends SlotContext> slotContextFuture,
+			SlotRequestId allocatedSlotRequestId) {
+		final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
+			slotRequestId,
+			slotContextFuture,
+			allocatedSlotRequestId);
+
+		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
+
+		synchronized (lock) {
+			unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
+		}
+
+		// add the root node to the set of resolved root nodes once the SlotContext future has
+		// been completed and we know the slot's TaskManagerLocation
+		slotContextFuture.whenComplete(
+			(SlotContext slotContext, Throwable throwable) -> {
+				if (slotContext != null) {
+					synchronized (lock) {
+						final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+
+						if (resolvedRootNode != null) {
+							final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
+								slotContext.getTaskManagerLocation(),
+								taskManagerLocation -> new HashSet<>(4));
+
+							innerCollection.add(resolvedRootNode);
+						}
+					}
+				} else {
+					rootMultiTaskSlot.release(throwable);
+				}
+			});
+
+		return rootMultiTaskSlot;
+	}
+
+	/**
+	 * Gets a resolved root slot which does not yet contain the given groupId. First the given set of
+	 * preferred locations is checked.
+	 *
+	 * @param groupId which the returned slot must not contain
+	 * @param locationPreferences specifying which locations are preferred
+	 * @return the resolved root slot and its locality wrt to the specified location preferences
+	 * 		or null if there was no root slot which did not contain the given groupId
+	 */
+	@Nullable
+	MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) {
+		Preconditions.checkNotNull(locationPreferences);
+
+		final MultiTaskSlotLocality multiTaskSlotLocality;
+
+		if (locationPreferences.isEmpty()) {
+			multiTaskSlotLocality = getResolvedRootSlotWithoutLocationPreferences(groupId);
+		} else {
+			multiTaskSlotLocality = getResolvedRootSlotWithLocationPreferences(groupId, locationPreferences);
+		}
+
+		return multiTaskSlotLocality;
+	}
+
+	/**
+	 * Gets a resolved root slot which does not yet contain the given groupId. The method will try to
+	 * find a slot of a TaskManager contained in the collection of preferred locations. If there is no such slot
+	 * with free capacities available, then the method will look for slots of TaskManager which run on the same
+	 * machine as the TaskManager in the collection of preferred locations. If there is no such slot, then any slot
+	 * with free capacities is returned. If there is no such slot, then null is returned.
+	 *
+	 * @param groupId which the returned slot must not contain
+	 * @param locationPreferences specifying which locations are preferred
+	 * @return the resolved root slot and its locality wrt to the specified location preferences
+	 * 		or null if there was not root slot which did not contain the given groupId
+	 */
+	@Nullable
+	private MultiTaskSlotLocality getResolvedRootSlotWithLocationPreferences(AbstractID groupId, Collection<TaskManagerLocation> locationPreferences) {
+		Preconditions.checkNotNull(groupId);
+		Preconditions.checkNotNull(locationPreferences);
+		final Set<String> hostnameSet = new HashSet<>(16);
+		MultiTaskSlot nonLocalMultiTaskSlot = null;
+
+		synchronized (lock) {
+			for (TaskManagerLocation locationPreference : locationPreferences) {
+				final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(locationPreference);
+
+				if (multiTaskSlots != null) {
+					for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
+						if (!multiTaskSlot.contains(groupId)) {
+							return MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
+						}
+					}
+
+					hostnameSet.add(locationPreference.getHostname());
+				}
+			}
+
+			for (Map.Entry<TaskManagerLocation, Set<MultiTaskSlot>> taskManagerLocationSetEntry : resolvedRootSlots.entrySet()) {
+				if (hostnameSet.contains(taskManagerLocationSetEntry.getKey().getHostname())) {
+					for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
+						if (!multiTaskSlot.contains(groupId)) {
+							return MultiTaskSlotLocality.of(multiTaskSlot, Locality.HOST_LOCAL);
+						}
+					}
+				} else if (nonLocalMultiTaskSlot == null) {
+					for (MultiTaskSlot multiTaskSlot : taskManagerLocationSetEntry.getValue()) {
+						if (!multiTaskSlot.contains(groupId)) {
+							nonLocalMultiTaskSlot = multiTaskSlot;
+						}
+					}
+				}
+			}
+		}
+
+		if (nonLocalMultiTaskSlot != null) {
+			return MultiTaskSlotLocality.of(nonLocalMultiTaskSlot, Locality.NON_LOCAL);
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Gets a resolved slot which does not yet contain the given groupId without any location
+	 * preferences.
+	 *
+	 * @param groupId which the returned slot must not contain
+	 * @return the resolved slot or null if there was no root slot with free capacities
+	 */
+	@Nullable
+	private MultiTaskSlotLocality getResolvedRootSlotWithoutLocationPreferences(AbstractID groupId) {
+		Preconditions.checkNotNull(groupId);
+
+		synchronized (lock) {
+			for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+				for (MultiTaskSlot multiTaskSlot : multiTaskSlots) {
+					if (!multiTaskSlot.contains(groupId)) {
+						return MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNCONSTRAINED);
+					}
+				}
+			}
+		}
+
+		return null;
+	}
+
+	/**
+	 * Gets an unresolved slot which does not yet contain the given groupId. An unresolved
+	 * slot is a slot whose underlying allocated slot has not been allocated yet.
+	 *
+	 * @param groupId which the returned slot must not contain
+	 * @return the unresolved slot or null if there was no root slot with free capacities
+	 */
+	@Nullable
+	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
+		synchronized (lock) {
+			for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
+				if (!multiTaskSlot.contains(groupId)) {
+					return multiTaskSlot;
+				}
+			}
+		}
+
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	// Inner classes: TaskSlot hierarchy and helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Helper class which contains a {@link MultiTaskSlot} and its {@link Locality}.
+	 */
+	static final class MultiTaskSlotLocality {
+		private final MultiTaskSlot multiTaskSlot;
+
+		private final Locality locality;
+
+		MultiTaskSlotLocality(MultiTaskSlot multiTaskSlot, Locality locality) {
+			this.multiTaskSlot = Preconditions.checkNotNull(multiTaskSlot);
+			this.locality = Preconditions.checkNotNull(locality);
+		}
+
+		MultiTaskSlot getMultiTaskSlot() {
+			return multiTaskSlot;
+		}
+
+		public Locality getLocality() {
+			return locality;
+		}
+
+		public static MultiTaskSlotLocality of(MultiTaskSlot multiTaskSlot, Locality locality) {
+			return new MultiTaskSlotLocality(multiTaskSlot, locality);
+		}
+	}
+
+	/**
+	 * Base class for all task slots.
+	 */
+	public abstract static class TaskSlot {
+		// every TaskSlot has an associated slot request id
+		private final SlotRequestId slotRequestId;
+
+		// all task slots except for the root slots have a group id assigned
+		@Nullable
+		private final AbstractID groupId;
+
+		TaskSlot(SlotRequestId slotRequestId, @Nullable AbstractID groupId) {
+			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+			this.groupId = groupId;
+		}
+
+		public SlotRequestId getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		@Nullable
+		public AbstractID getGroupId() {
+			return groupId;
+		}
+
+		/**
+		 * Check whether the task slot contains the given groupId.
+		 *
+		 * @param groupId which to check whether it is contained
+		 * @return true if the task slot contains the given groupId, otherwise false
+		 */
+		public boolean contains(AbstractID groupId) {
+			return Objects.equals(this.groupId, groupId);
+		}
+
+		/**
+		 * Release the task slot.
+		 *
+		 * @param cause for the release
+		 * @return true if the slot could be released, otherwise false
+		 */
+		public abstract boolean release(Throwable cause);
+	}
+
+	/**
+	 * {@link TaskSlot} implementation which can have multiple other task slots assigned as children.
+	 */
+	public final class MultiTaskSlot extends TaskSlot implements AllocatedSlot.Payload {
+
+		private final Map<AbstractID, TaskSlot> children;
+
+		// the root node has its parent set to null
+		@Nullable
+		private final MultiTaskSlot parent;
+
+		// underlying allocated slot
+		private final CompletableFuture<? extends SlotContext> slotContextFuture;
+
+		// slot request id of the allocated slot
+		@Nullable
+		private final SlotRequestId allocatedSlotRequestId;
+
+		// true if we are currently releasing our children
+		private boolean releasingChildren;
+
+		private MultiTaskSlot(
+				SlotRequestId slotRequestId,
+				AbstractID groupId,
+				MultiTaskSlot parent) {
+			this(
+				slotRequestId,
+				groupId,
+				Preconditions.checkNotNull(parent),
+				parent.getSlotContextFuture(),
+				null);
+		}
+
+		private MultiTaskSlot(
+				SlotRequestId slotRequestId,
+				CompletableFuture<? extends SlotContext> slotContextFuture,
+				SlotRequestId allocatedSlotRequestId) {
+			this(
+				slotRequestId,
+				null,
+				null,
+				slotContextFuture,
+				allocatedSlotRequestId);
+		}
+
+		private MultiTaskSlot(
+				SlotRequestId slotRequestId,
+				@Nullable AbstractID groupId,
+				MultiTaskSlot parent,
+				CompletableFuture<? extends SlotContext> slotContextFuture,
+				SlotRequestId allocatedSlotRequestId) {
+			super(slotRequestId, groupId);
+
+			this.parent = parent;
+			this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
+			this.allocatedSlotRequestId = allocatedSlotRequestId;
+
+			this.children = new HashMap<>(16);
+			this.releasingChildren = false;
+
+			slotContextFuture.whenComplete(
+				(SlotContext ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						release(throwable);
+					}
+				});
+		}
+
+		CompletableFuture<? extends SlotContext> getSlotContextFuture() {
+			return slotContextFuture;
+		}
+
+		/**
+		 * Allocates a MultiTaskSlot and registers it under the given groupId at
+		 * this MultiTaskSlot.
+		 *
+		 * @param slotRequestId of the new multi task slot
+		 * @param groupId under which the new multi task slot is registered
+		 * @return the newly allocated MultiTaskSlot
+		 */
+		MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) {
+			Preconditions.checkState(!super.contains(groupId));
+
+			final MultiTaskSlot inner = new MultiTaskSlot(
+				slotRequestId,
+				groupId,
+				this);
+
+			children.put(groupId, inner);
+
+			// register the newly allocated slot also at the SlotSharingManager
+			allTaskSlots.put(slotRequestId, inner);
+
+			return inner;
+		}
+
+		/**
+		 * Allocates a {@link SingleTaskSlot} and registers it under the given groupId at
+		 * this MultiTaskSlot.
+		 *
+		 * @param slotRequestId of the new single task slot
+		 * @param groupId under which the new single task slot is registered
+		 * @param locality of the allocation
+		 * @return the newly allocated {@link SingleTaskSlot}
+		 */
+		SingleTaskSlot allocateSingleTaskSlot(
+				SlotRequestId slotRequestId,
+				AbstractID groupId,
+				Locality locality) {
+			Preconditions.checkState(!super.contains(groupId));
+
+			final SingleTaskSlot leaf = new SingleTaskSlot(
+				slotRequestId,
+				groupId,
+				this,
+				locality);
+
+			children.put(groupId, leaf);
+
+			// register the newly allocated slot also at the SlotSharingManager
+			allTaskSlots.put(slotRequestId, leaf);
+
+			return leaf;
+		}
+
+		/**
+		 * Checks whether this slot or any of its children contains the given groupId.
+		 *
+		 * @param groupId which to check whether it is contained
+		 * @return true if this or any of its children contains the given groupId, otherwise false
+		 */
+		@Override
+		public boolean contains(AbstractID groupId) {
+			if (super.contains(groupId)) {
+				return true;
+			} else {
+				for (TaskSlot taskSlot : children.values()) {
+					if (taskSlot.contains(groupId)) {
+						return true;
+					}
+				}
+
+				return false;
+			}
+		}
+
+		@Override
+		public boolean release(Throwable cause) {
+			releasingChildren = true;
+
+			// first release all children and remove them if they could be released immediately
+			children.values().removeIf(node -> {
+				boolean release = node.release(cause);
+
+				if (release) {
+					allTaskSlots.remove(node.getSlotRequestId());
+				}
+
+				return release;
+			});
+
+			releasingChildren = false;
+
+			if (children.isEmpty()) {
+				if (parent != null) {
+					// we remove ourselves from our parent if we no longer have children
+					parent.releaseChild(getGroupId());
+				} else {
+					// we are the root node --> remove the root node from the list of task slots
+					allTaskSlots.remove(getSlotRequestId());
+
+					if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
+						synchronized (lock) {
+							// the root node should still be unresolved
+							unresolvedRootSlots.remove(getSlotRequestId());
+						}
+					} else {
+						// the root node should be resolved --> we can access the slot context
+						final SlotContext slotContext = slotContextFuture.getNow(null);
+
+						if (slotContext != null) {
+							synchronized (lock) {
+								final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+
+								if (multiTaskSlots != null) {
+									multiTaskSlots.remove(this);
+
+									if (multiTaskSlots.isEmpty()) {
+										resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
+									}
+								}
+							}
+						}
+					}
+
+					// release the underlying allocated slot
+					allocatedSlotActions.releaseSlot(allocatedSlotRequestId, null, cause);
+				}
+
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		/**
+		 * Releases the child with the given childGroupId.
+		 *
+		 * @param childGroupId identifying the child to release
+		 */
+		private void releaseChild(AbstractID childGroupId) {
+			if (!releasingChildren) {
+				TaskSlot child = children.remove(childGroupId);
+
+				if (child != null) {
+					allTaskSlots.remove(child.getSlotRequestId());
+				}
+
+				if (children.isEmpty()) {
+					release(new FlinkException("Release multi task slot because all children have been released."));
+				}
+			}
+		}
+	}
+
+	/**
+	 * {@link TaskSlot} implementation which harbours a {@link LogicalSlot}. The {@link SingleTaskSlot}
+	 * cannot have any children assigned.
+	 */
+	public final class SingleTaskSlot extends TaskSlot {
+		private final MultiTaskSlot parent;
+
+		// future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
+		private final CompletableFuture<LogicalSlot> logicalSlotFuture;
+
+		private SingleTaskSlot(
+				SlotRequestId slotRequestId,
+				AbstractID groupId,
+				MultiTaskSlot parent,
+				Locality locality) {
+			super(slotRequestId, groupId);
+
+			this.parent = Preconditions.checkNotNull(parent);
+
+			Preconditions.checkNotNull(locality);
+			logicalSlotFuture = parent.getSlotContextFuture()
+				.thenApply(
+					(SlotContext slotContext) ->
+						new SingleLogicalSlot(
+							slotRequestId,
+							slotContext,
+							slotSharingGroupId,
+							locality,
+							slotOwner));
+		}
+
+		CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+			return logicalSlotFuture;
+		}
+
+		@Override
+		public boolean release(Throwable cause) {
+			logicalSlotFuture.completeExceptionally(cause);
+
+			boolean pendingLogicalSlotRelease = false;
+
+			if (logicalSlotFuture.isDone() && !logicalSlotFuture.isCompletedExceptionally()) {
+				// we have a single task slot which we first have to release
+				final LogicalSlot logicalSlot = logicalSlotFuture.getNow(null);
+
+				if ((logicalSlot != null) && (logicalSlot.isAlive())) {
+					pendingLogicalSlotRelease = logicalSlot.releaseSlot(cause).isDone();
+				}
+			}
+
+			if (!pendingLogicalSlotRelease) {
+				parent.releaseChild(getGroupId());
+			}
+
+			return !pendingLogicalSlotRelease;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Methods and classes for testing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a collection of all resolved root slots.
+	 *
+	 * @return Collection of all resolved root slots
+	 */
+	@VisibleForTesting
+	public Collection<MultiTaskSlot> getResolvedRootSlots() {
+		return new ResolvedRootSlotValues();
+	}
+
+	@VisibleForTesting
+	Collection<MultiTaskSlot> getUnresolvedRootSlots() {
+		synchronized (lock) {
+			return unresolvedRootSlots.values();
+		}
+	}
+
+	/**
+	 * Collection of all resolved {@link MultiTaskSlot} root slots.
+	 */
+	private final class ResolvedRootSlotValues extends AbstractCollection<MultiTaskSlot> {
+
+		@Override
+		public Iterator<MultiTaskSlot> iterator() {
+			synchronized (lock) {
+				return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
+			}
+		}
+
+		@Override
+		public int size() {
+			int numberResolvedMultiTaskSlots = 0;
+
+			synchronized (lock) {
+				for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
+					numberResolvedMultiTaskSlots += multiTaskSlots.size();
+				}
+			}
+
+			return numberResolvedMultiTaskSlots;
+		}
+	}
+
+	/**
+	 * Iterator over all resolved {@link MultiTaskSlot} root slots.
+	 */
+	private static final class ResolvedRootSlotIterator implements Iterator<MultiTaskSlot> {
+		private final Iterator<Set<MultiTaskSlot>> baseIterator;
+		private Iterator<MultiTaskSlot> currentIterator;
+
+		private ResolvedRootSlotIterator(Iterator<Set<MultiTaskSlot>> baseIterator) {
+			this.baseIterator = Preconditions.checkNotNull(baseIterator);
+
+			if (baseIterator.hasNext()) {
+				currentIterator = baseIterator.next().iterator();
+			} else {
+				currentIterator = Collections.emptyIterator();
+			}
+		}
+
+		@Override
+		public boolean hasNext() {
+			progressToNextElement();
+
+			return currentIterator.hasNext();
+		}
+
+		@Override
+		public MultiTaskSlot next() {
+			progressToNextElement();
+
+			return currentIterator.next();
+		}
+
+		private void progressToNextElement() {
+			while (baseIterator.hasNext() && !currentIterator.hasNext()) {
+				currentIterator = baseIterator.next().iterator();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 7e85167..e98efc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index fc2c06f..6aa36b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.LogicalSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 0d7c8e6..7b9d9aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 16da8e6..e869625 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 92c7c61..caf89e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.LogicalSlot;
-import org.apache.flink.runtime.instance.TestingLogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 80df852..2245a8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 18e6cf1..f75cb4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -28,18 +28,17 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotRequestID;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -448,7 +447,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 12345);
 
 		SimpleSlotContext slot = new SimpleSlotContext(
-			new SlotRequestID(),
 			new AllocationID(),
 			location,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 65a52bc..b5a29c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index c97329f..b1ee3cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -41,16 +41,15 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotRequestID;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -245,7 +244,6 @@ public class ExecutionGraphTestUtils {
 				ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572);
 
 		final SimpleSlotContext allocatedSlot = new SimpleSlotContext(
-			new SlotRequestID(),
 			new AllocationID(),
 			location,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index e3fd0df..46dfd41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 63cebf3..d91380e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -25,14 +25,14 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index bffbb6a..274df59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -30,16 +30,15 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotRequestID;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -235,7 +234,6 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		//  - exposing test methods in the ExecutionVertex leads to undesirable setters 
 
 		SlotContext slot = new SimpleSlotContext(
-			new SlotRequestID(),
 			new AllocationID(),
 			location,
 			0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 9310912..25e1207 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -59,7 +59,7 @@ public class ExecutionVertexSchedulingTest {
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
 			final SimpleSlot slot = instance.allocateSimpleSlot();
 			
-			slot.releaseInstanceSlot();
+			slot.releaseSlot();
 			assertTrue(slot.isReleased());
 
 			Scheduler scheduler = mock(Scheduler.class);
@@ -91,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
 			final Instance instance = getInstance(new ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
 			final SimpleSlot slot = instance.allocateSimpleSlot();
 
-			slot.releaseInstanceSlot();
+			slot.releaseSlot();
 			assertTrue(slot.isReleased());
 
 			final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 4d53e67..c411393 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 32ccad1..4725296 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index b5a67fd..49a6dce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 656c372..06647cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionSt
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 24affad..f44626d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.instance.LogicalSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 4709bce..f94959d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 82953d6..bffdab6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,16 +22,15 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotRequestID;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
@@ -63,7 +62,6 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
 		for (int i = 0; i < numSlots; i++) {
 			SimpleSlotContext as = new SimpleSlotContext(
-				new SlotRequestID(),
 				new AllocationID(),
 				new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + i),
 				0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
deleted file mode 100644
index 223d43c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-public class AllocatedSlotsTest extends TestLogger {
-
-	@Test
-	public void testOperations() throws Exception {
-		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
-
-		final AllocationID allocation1 = new AllocationID();
-		final SlotRequestID slotRequestID = new SlotRequestID();
-		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
-		final ResourceID resource1 = taskManagerLocation.getResourceID();
-		final AllocatedSlot slot1 = createSlot(allocation1, taskManagerLocation);
-
-		allocatedSlots.add(slotRequestID, slot1);
-
-		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
-		assertTrue(allocatedSlots.containResource(resource1));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(1, allocatedSlots.size());
-
-		final AllocationID allocation2 = new AllocationID();
-		final SlotRequestID slotRequestID2 = new SlotRequestID();
-		final AllocatedSlot slot2 = createSlot(allocation2, taskManagerLocation);
-
-		allocatedSlots.add(slotRequestID2, slot2);
-
-		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
-		assertTrue(allocatedSlots.containResource(resource1));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(slot2, allocatedSlots.get(allocation2));
-		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(2, allocatedSlots.size());
-
-		final AllocationID allocation3 = new AllocationID();
-		final SlotRequestID slotRequestID3 = new SlotRequestID();
-		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
-		final ResourceID resource2 = taskManagerLocation2.getResourceID();
-		final AllocatedSlot slot3 = createSlot(allocation3, taskManagerLocation2);
-
-		allocatedSlots.add(slotRequestID3, slot3);
-
-		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
-		assertTrue(allocatedSlots.contains(slot2.getAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
-		assertTrue(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertEquals(slot2, allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(2, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
-		assertEquals(3, allocatedSlots.size());
-
-		allocatedSlots.remove(slot2.getAllocationId());
-
-		assertTrue(allocatedSlots.contains(slot1.getAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
-		assertTrue(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertEquals(slot1, allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
-		assertEquals(2, allocatedSlots.size());
-
-		allocatedSlots.remove(slot1.getAllocationId());
-
-		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
-		assertTrue(allocatedSlots.contains(slot3.getAllocationId()));
-		assertFalse(allocatedSlots.containResource(resource1));
-		assertTrue(allocatedSlots.containResource(resource2));
-
-		assertNull(allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertEquals(slot3, allocatedSlots.get(allocation3));
-		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(1, allocatedSlots.getSlotsForTaskManager(resource2).size());
-		assertEquals(1, allocatedSlots.size());
-
-		allocatedSlots.remove(slot3.getAllocationId());
-
-		assertFalse(allocatedSlots.contains(slot1.getAllocationId()));
-		assertFalse(allocatedSlots.contains(slot2.getAllocationId()));
-		assertFalse(allocatedSlots.contains(slot3.getAllocationId()));
-		assertFalse(allocatedSlots.containResource(resource1));
-		assertFalse(allocatedSlots.containResource(resource2));
-
-		assertNull(allocatedSlots.get(allocation1));
-		assertNull(allocatedSlots.get(allocation2));
-		assertNull(allocatedSlots.get(allocation3));
-		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource1).size());
-		assertEquals(0, allocatedSlots.getSlotsForTaskManager(resource2).size());
-		assertEquals(0, allocatedSlots.size());
-	}
-
-	private AllocatedSlot createSlot(final AllocationID allocationId, final TaskManagerLocation taskManagerLocation) {
-		return new AllocatedSlot(
-			allocationId,
-			taskManagerLocation,
-			0,
-			ResourceProfile.UNKNOWN,
-			new SimpleAckingTaskManagerGateway(),
-			new DummySlotOwner());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
deleted file mode 100644
index 9ede899..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AvailableSlotsTest extends TestLogger {
-
-	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
-
-	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
-
-	@Test
-	public void testAddAndRemove() throws Exception {
-		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-
-		final ResourceID resource1 = new ResourceID("resource1");
-		final ResourceID resource2 = new ResourceID("resource2");
-
-		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
-		final AllocatedSlot slot2 = createAllocatedSlot(resource1);
-		final AllocatedSlot slot3 = createAllocatedSlot(resource2);
-
-		availableSlots.add(slot1, 1L);
-		availableSlots.add(slot2, 2L);
-		availableSlots.add(slot3, 3L);
-
-		assertEquals(3, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getAllocationId()));
-		assertTrue(availableSlots.contains(slot2.getAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getAllocationId()));
-		assertTrue(availableSlots.containsTaskManager(resource1));
-		assertTrue(availableSlots.containsTaskManager(resource2));
-
-		availableSlots.removeAllForTaskManager(resource1);
-
-		assertEquals(1, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getAllocationId()));
-		assertTrue(availableSlots.contains(slot3.getAllocationId()));
-		assertFalse(availableSlots.containsTaskManager(resource1));
-		assertTrue(availableSlots.containsTaskManager(resource2));
-
-		availableSlots.removeAllForTaskManager(resource2);
-
-		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getAllocationId()));
-		assertFalse(availableSlots.contains(slot2.getAllocationId()));
-		assertFalse(availableSlots.contains(slot3.getAllocationId()));
-		assertFalse(availableSlots.containsTaskManager(resource1));
-		assertFalse(availableSlots.containsTaskManager(resource2));
-	}
-
-	@Test
-	public void testPollFreeSlot() {
-		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
-
-		final ResourceID resource1 = new ResourceID("resource1");
-		final AllocatedSlot slot1 = createAllocatedSlot(resource1);
-
-		availableSlots.add(slot1, 1L);
-
-		assertEquals(1, availableSlots.size());
-		assertTrue(availableSlots.contains(slot1.getAllocationId()));
-		assertTrue(availableSlots.containsTaskManager(resource1));
-
-		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, null));
-
-		SlotAndLocality slotAndLocality = availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
-		assertEquals(slot1, slotAndLocality.slot());
-		assertEquals(0, availableSlots.size());
-		assertFalse(availableSlots.contains(slot1.getAllocationId()));
-		assertFalse(availableSlots.containsTaskManager(resource1));
-	}
-
-	static AllocatedSlot createAllocatedSlot(final ResourceID resourceId) {
-		TaskManagerLocation mockTaskManagerLocation = mock(TaskManagerLocation.class);
-		when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
-
-		TaskManagerGateway mockTaskManagerGateway = mock(TaskManagerGateway.class);
-
-		return new AllocatedSlot(
-			new AllocationID(),
-			mockTaskManagerLocation,
-			0,
-			DEFAULT_TESTING_PROFILE,
-			mockTaskManagerGateway,
-			new DummySlotOwner());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 229237d..6d7d1ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -84,10 +84,10 @@ public class InstanceTest {
 			}
 
 			// release the slots. this returns them to the instance
-			slot1.releaseInstanceSlot();
-			slot2.releaseInstanceSlot();
-			slot3.releaseInstanceSlot();
-			slot4.releaseInstanceSlot();
+			slot1.releaseSlot();
+			slot2.releaseSlot();
+			slot3.releaseSlot();
+			slot4.releaseSlot();
 
 			assertEquals(4, instance.getNumberOfAvailableSlots());
 			assertEquals(0, instance.getNumberOfAllocatedSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 5104e48..860100a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -83,7 +83,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, slot.getRootSlotNumber());
 			
 			// release the slot immediately.
-			slot.releaseInstanceSlot();
+			slot.releaseSlot();
 
 			assertTrue(slot.isCanceled());
 			assertTrue(slot.isReleased());
@@ -202,7 +202,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid4));
 			
 			// release from the root.
-			sharedSlot.releaseInstanceSlot();
+			sharedSlot.releaseSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sub1.isReleased());
@@ -261,7 +261,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			// release from the leaves.
 			
-			sub2.releaseInstanceSlot();
+			sub2.releaseSlot();
 
 			assertTrue(sharedSlot.isAlive());
 			assertTrue(sub1.isAlive());
@@ -276,7 +276,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(2, sharedSlot.getNumberLeaves());
 
 			
-			sub1.releaseInstanceSlot();
+			sub1.releaseSlot();
 
 			assertTrue(sharedSlot.isAlive());
 			assertTrue(sub1.isReleased());
@@ -290,7 +290,7 @@ public class SharedSlotsTest extends TestLogger {
 			
 			assertEquals(1, sharedSlot.getNumberLeaves());
 
-			sub3.releaseInstanceSlot();
+			sub3.releaseSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sub1.isReleased());
@@ -344,7 +344,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(1, assignment.getNumberOfSlots());
 			
 			
-			sub2.releaseInstanceSlot();
+			sub2.releaseSlot();
 
 			assertEquals(1, sharedSlot.getNumberLeaves());
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid1));
@@ -362,8 +362,8 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(vid3));
 			assertEquals(1, assignment.getNumberOfSlots());
 			
-			sub3.releaseInstanceSlot();
-			sub1.releaseInstanceSlot();
+			sub3.releaseSlot();
+			sub1.releaseSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertEquals(0, sharedSlot.getNumberLeaves());
@@ -439,7 +439,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertFalse(constraint.isAssigned());
 			
 			// we do not immediately lock the location
-			headSlot.releaseInstanceSlot();
+			headSlot.releaseSlot();
 			assertEquals(1, sharedSlot.getNumberLeaves());
 
 			assertNotNull(constraint.getSharedSlot());
@@ -464,8 +464,8 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(4, sharedSlot.getNumberLeaves());
 			
 			// we release our co-location constraint tasks
-			headSlot.releaseInstanceSlot();
-			tailSlot.releaseInstanceSlot();
+			headSlot.releaseSlot();
+			tailSlot.releaseSlot();
 
 			assertEquals(2, sharedSlot.getNumberLeaves());
 			assertTrue(headSlot.isReleased());
@@ -497,10 +497,10 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(constraint.getGroupId(), constraint.getSharedSlot().getGroupID());
 			
 			// release all
-			sourceSlot.releaseInstanceSlot();
-			headSlot.releaseInstanceSlot();
-			tailSlot.releaseInstanceSlot();
-			sinkSlot.releaseInstanceSlot();
+			sourceSlot.releaseSlot();
+			headSlot.releaseSlot();
+			tailSlot.releaseSlot();
+			sinkSlot.releaseSlot();
 			
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sourceSlot.isReleased());
@@ -573,10 +573,10 @@ public class SharedSlotsTest extends TestLogger {
 			assertEquals(4, sharedSlot.getNumberLeaves());
 
 			// release all
-			sourceSlot.releaseInstanceSlot();
-			headSlot.releaseInstanceSlot();
-			tailSlot.releaseInstanceSlot();
-			sinkSlot.releaseInstanceSlot();
+			sourceSlot.releaseSlot();
+			headSlot.releaseSlot();
+			tailSlot.releaseSlot();
+			sinkSlot.releaseSlot();
 
 			assertTrue(sharedSlot.isReleased());
 			assertTrue(sourceSlot.isReleased());
@@ -613,7 +613,7 @@ public class SharedSlotsTest extends TestLogger {
 			SharedSlot sharedSlot = instance.allocateSharedSlot(assignment);
 
 			SimpleSlot sub = assignment.addSharedSlotAndAllocateSubSlot(sharedSlot, Locality.UNCONSTRAINED, vid);
-			sub.releaseInstanceSlot();
+			sub.releaseSlot();
 			
 			assertTrue(sub.isReleased());
 			assertTrue(sharedSlot.isReleased());
@@ -648,7 +648,7 @@ public class SharedSlotsTest extends TestLogger {
 			assertNull(sub.getGroupID());
 			assertEquals(constraint.getSharedSlot(), sub.getParent());
 			
-			sub.releaseInstanceSlot();
+			sub.releaseSlot();
 
 			assertTrue(sub.isReleased());
 			assertTrue(sharedSlot.isReleased());

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 6d572ad..de2ae41 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.TestingPayload;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
@@ -43,7 +44,7 @@ public class SimpleSlotTest extends  TestLogger {
 				SimpleSlot slot = getSlot();
 				assertTrue(slot.isAlive());
 
-				slot.releaseInstanceSlot();
+				slot.releaseSlot();
 				assertFalse(slot.isAlive());
 				assertTrue(slot.isCanceled());
 				assertTrue(slot.isReleased());
@@ -111,7 +112,7 @@ public class SimpleSlotTest extends  TestLogger {
 			// assign to released
 			{
 				SimpleSlot slot = getSlot();
-				slot.releaseInstanceSlot();
+				slot.releaseSlot();
 
 				assertFalse(slot.tryAssignPayload(payload1));
 				assertNull(slot.getPayload());