You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:25 UTC

[08/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
new file mode 100644
index 0000000..996e445
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -0,0 +1,1559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by {@link ExecutionGraph}. It will will attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ *
+ * <p>All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ *
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a slot request
+ */
+public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
+
+	/** The log for the pool - shared also with the internal classes. */
+	static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+	// ------------------------------------------------------------------------
+
+	private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
+
+	private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
+
+	private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+
+	// ------------------------------------------------------------------------
+
+	private final JobID jobId;
+
+	private final ProviderAndOwner providerAndOwner;
+
+	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */
+	private final HashSet<ResourceID> registeredTaskManagers;
+
+	/** The book-keeping of all allocated slots. */
+	private final AllocatedSlots allocatedSlots;
+
+	/** The book-keeping of all available slots. */
+	private final AvailableSlots availableSlots;
+
+	/** All pending requests waiting for slots. */
+	private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
+
+	/** The requests that are waiting for the resource manager to be connected. */
+	private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
+
+	/** Timeout for request calls to the ResourceManager. */
+	private final Time resourceManagerRequestsTimeout;
+
+	/** Timeout for allocation round trips (RM -> launch TM -> offer slot). */
+	private final Time resourceManagerAllocationTimeout;
+
+	private final Clock clock;
+
+	/** Managers for the different slot sharing groups. */
+	protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;
+
+	/** the fencing token of the job manager. */
+	private JobMasterId jobMasterId;
+
+	/** The gateway to communicate with resource manager. */
+	private ResourceManagerGateway resourceManagerGateway;
+
+	private String jobManagerAddress;
+
+	// ------------------------------------------------------------------------
+
+	public SlotPool(RpcService rpcService, JobID jobId) {
+		this(rpcService, jobId, SystemClock.getInstance(),
+				DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+	}
+
+	public SlotPool(
+			RpcService rpcService,
+			JobID jobId,
+			Clock clock,
+			Time slotRequestTimeout,
+			Time resourceManagerAllocationTimeout,
+			Time resourceManagerRequestTimeout) {
+
+		super(rpcService);
+
+		this.jobId = checkNotNull(jobId);
+		this.clock = checkNotNull(clock);
+		this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+		this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
+
+		this.registeredTaskManagers = new HashSet<>();
+		this.allocatedSlots = new AllocatedSlots();
+		this.availableSlots = new AvailableSlots();
+		this.pendingRequests = new DualKeyMap<>(16);
+		this.waitingForResourceManager = new HashMap<>(16);
+
+		this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
+
+		this.slotSharingManagers = new HashMap<>(4);
+
+		this.jobMasterId = null;
+		this.resourceManagerGateway = null;
+		this.jobManagerAddress = null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Starting and Stopping
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start() {
+		throw new UnsupportedOperationException("Should never call start() without leader ID");
+	}
+
+	/**
+	 * Start the slot pool to accept RPC calls.
+	 *
+	 * @param jobMasterId The necessary leader id for running the job.
+	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
+	 */
+	public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
+		this.jobMasterId = checkNotNull(jobMasterId);
+		this.jobManagerAddress = checkNotNull(newJobManagerAddress);
+
+		// TODO - start should not throw an exception
+		try {
+			super.start();
+		} catch (Exception e) {
+			throw new RuntimeException("This should never happen", e);
+		}
+	}
+
+	/**
+	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
+	 */
+	@Override
+	public void suspend() {
+		validateRunsInMainThread();
+
+		// suspend this RPC endpoint
+		stop();
+
+		// do not accept any requests
+		jobMasterId = null;
+		resourceManagerGateway = null;
+
+		// Clear (but not release!) the available slots. The TaskManagers should re-register them
+		// at the new leader JobManager/SlotPool
+		availableSlots.clear();
+		allocatedSlots.clear();
+		pendingRequests.clear();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Getting PoolOwner and PoolProvider
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the slot owner implementation for this pool.
+	 *
+	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
+	 *
+	 * @return The slot owner implementation for this pool.
+	 */
+	public SlotOwner getSlotOwner() {
+		return providerAndOwner;
+	}
+
+	/**
+	 * Gets the slot provider implementation for this pool.
+	 *
+	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
+	 *
+	 * @return The slot provider implementation for this pool.
+	 */
+	public SlotProvider getSlotProvider() {
+		return providerAndOwner;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Resource Manager Connection
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
+		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+
+		// work on all slots waiting for this connection
+		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
+		}
+
+		// all sent off
+		waitingForResourceManager.clear();
+	}
+
+	@Override
+	public void disconnectResourceManager() {
+		this.resourceManagerGateway = null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot Allocation
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<LogicalSlot> allocateSlot(
+			SlotRequestId slotRequestId,
+			ScheduledUnit scheduledUnit,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling,
+			Time timeout) {
+
+		return internalAllocateSlot(
+			slotRequestId,
+			scheduledUnit,
+			resourceProfile,
+			locationPreferences,
+			allowQueuedScheduling);
+	}
+
+	private CompletableFuture<LogicalSlot> internalAllocateSlot(
+			SlotRequestId slotRequestId,
+			ScheduledUnit task,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling) {
+
+		final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId();
+
+		if (slotSharingGroupId != null) {
+			// allocate slot with slot sharing
+			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
+				slotSharingGroupId,
+				id -> new SlotSharingManager(
+					id,
+					this,
+					providerAndOwner));
+
+			final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
+
+			try {
+				if (task.getCoLocationConstraint() != null) {
+					multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
+						task.getCoLocationConstraint(),
+						multiTaskSlotManager,
+						resourceProfile,
+						locationPreferences,
+						allowQueuedScheduling);
+				} else {
+					multiTaskSlotLocality = allocateMultiTaskSlot(
+						task.getJobVertexId(), multiTaskSlotManager,
+						resourceProfile,
+						locationPreferences,
+						allowQueuedScheduling);
+				}
+			} catch (NoResourceAvailableException noResourceException) {
+				return FutureUtils.completedExceptionally(noResourceException);
+			}
+
+			// sanity check
+			Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+			final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
+				slotRequestId,
+				task.getJobVertexId(),
+				multiTaskSlotLocality.getLocality());
+
+			return leaf.getLogicalSlotFuture();
+		} else {
+			// request an allocated slot to assign a single logical slot to
+			CompletableFuture<SlotAndLocality> slotAndLocalityFuture = requestAllocatedSlot(
+				slotRequestId,
+				resourceProfile,
+				locationPreferences,
+				allowQueuedScheduling);
+
+			return slotAndLocalityFuture.thenApply(
+				(SlotAndLocality slotAndLocality) -> {
+					final AllocatedSlot allocatedSlot = slotAndLocality.getSlot();
+
+					final SingleLogicalSlot singleTaskSlot = new SingleLogicalSlot(
+						slotRequestId,
+						allocatedSlot,
+						null,
+						slotAndLocality.getLocality(),
+						providerAndOwner);
+
+					if (allocatedSlot.tryAssignPayload(singleTaskSlot)) {
+						return singleTaskSlot;
+					} else {
+						final FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + allocatedSlot.getAllocationId() + '.');
+						releaseSlot(slotRequestId, null, flinkException);
+						throw new CompletionException(flinkException);
+					}
+				});
+		}
+	}
+
+	/**
+	 * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for the given {@link CoLocationConstraint}.
+	 *
+	 * <p>If allowQueuedScheduling is true, then the returned {@link SlotSharingManager.MultiTaskSlot} can be
+	 * uncompleted.
+	 *
+	 * @param coLocationConstraint for which to allocate a {@link SlotSharingManager.MultiTaskSlot}
+	 * @param multiTaskSlotManager responsible for the slot sharing group for which to allocate the slot
+	 * @param resourceProfile specifying the requirements for the requested slot
+	 * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot
+	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false
+	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link SlotSharingManager.MultiTaskSlot}
+	 * 		and its locality wrt the given location preferences
+	 * @throws NoResourceAvailableException if no task slot could be allocated
+	 */
+	private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
+			CoLocationConstraint coLocationConstraint,
+			SlotSharingManager multiTaskSlotManager,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling) throws NoResourceAvailableException {
+		final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
+
+		if (coLocationSlotRequestId != null) {
+			// we have a slot assigned --> try to retrieve it
+			final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
+
+			if (taskSlot != null) {
+				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
+				return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL);
+			} else {
+				// the slot may have been cancelled in the mean time
+				coLocationConstraint.setSlotRequestId(null);
+			}
+		}
+
+		final Collection<TaskManagerLocation> actualLocationPreferences;
+
+		if (coLocationConstraint.isAssigned()) {
+			actualLocationPreferences = Collections.singleton(coLocationConstraint.getLocation());
+		} else {
+			actualLocationPreferences = locationPreferences;
+		}
+
+		// get a new multi task slot
+		final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
+			coLocationConstraint.getGroupId(), multiTaskSlotManager,
+			resourceProfile,
+			actualLocationPreferences,
+			allowQueuedScheduling);
+
+		// check whether we fulfill the co-location constraint
+		if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
+			multiTaskSlotLocality.getMultiTaskSlot().release(
+				new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
+
+			throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
+				"co location constraint " + coLocationConstraint + '.');
+		}
+
+		final SlotRequestId slotRequestId = new SlotRequestId();
+		final SlotSharingManager.MultiTaskSlot coLocationSlot = multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
+			slotRequestId,
+			coLocationConstraint.getGroupId());
+
+		// mark the requested slot as co-located slot for other co-located tasks
+		coLocationConstraint.setSlotRequestId(slotRequestId);
+
+		// lock the co-location constraint once we have obtained the allocated slot
+		coLocationSlot.getSlotContextFuture().whenComplete(
+			(SlotContext slotContext, Throwable throwable) -> {
+				if (throwable == null) {
+					// check whether we are still assigned to the co-location constraint
+					if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
+						coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
+					} else {
+						log.debug("Failed to lock colocation constraint {} because assigned slot " +
+							"request {} differs from fulfilled slot request {}.",
+							coLocationConstraint.getGroupId(),
+							coLocationConstraint.getSlotRequestId(),
+							slotRequestId);
+					}
+				} else {
+					log.debug("Failed to lock colocation constraint {} because the slot " +
+						"allocation for slot request {} failed.",
+						coLocationConstraint.getGroupId(),
+						coLocationConstraint.getSlotRequestId(),
+						throwable);
+				}
+			});
+
+		return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
+	}
+
+	/**
+	 * Allocates a {@link SlotSharingManager.MultiTaskSlot} for the given groupId which is in the
+	 * slot sharing group for which the given {@link SlotSharingManager} is responsible.
+	 *
+	 * <p>If allowQueuedScheduling is true, then the method can return an uncompleted {@link SlotSharingManager.MultiTaskSlot}.
+	 *
+	 * @param groupId for which to allocate a new {@link SlotSharingManager.MultiTaskSlot}
+	 * @param slotSharingManager responsible for the slot sharing group for which to allocate the slot
+	 * @param resourceProfile specifying the requirements for the requested slot
+	 * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot
+	 * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false
+	 * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated {@link SlotSharingManager.MultiTaskSlot}
+	 * 		and its locality wrt the given location preferences
+	 * @throws NoResourceAvailableException if no task slot could be allocated
+	 */
+	private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
+			AbstractID groupId,
+			SlotSharingManager slotSharingManager,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling) throws NoResourceAvailableException {
+
+		// check first whether we have a resolved root slot which we can use
+		SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot(
+			groupId,
+			locationPreferences);
+
+		if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
+			return multiTaskSlotLocality;
+		}
+
+		final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
+		final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
+
+		// check whether we have an allocated slot available which we can use to create a new multi task slot in
+		final SlotAndLocality polledSlotAndLocality = pollAndAllocateSlot(allocatedSlotRequestId, resourceProfile, locationPreferences);
+
+		if (polledSlotAndLocality != null && (polledSlotAndLocality.getLocality() == Locality.LOCAL || multiTaskSlotLocality == null)) {
+
+			final AllocatedSlot allocatedSlot = polledSlotAndLocality.getSlot();
+			final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
+				multiTaskSlotRequestId,
+				CompletableFuture.completedFuture(polledSlotAndLocality.getSlot()),
+				allocatedSlotRequestId);
+
+			if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
+				return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, polledSlotAndLocality.getLocality());
+			} else {
+				multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
+					allocatedSlot.getAllocationId() + '.'));
+			}
+		}
+
+		if (multiTaskSlotLocality != null) {
+			// prefer slot sharing group slots over unused slots
+			if (polledSlotAndLocality != null) {
+				releaseSlot(
+					allocatedSlotRequestId,
+					null,
+					new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
+			}
+			return multiTaskSlotLocality;
+		}
+
+		if (allowQueuedScheduling) {
+			// there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
+			SlotSharingManager.MultiTaskSlot multiTaskSlotFuture = slotSharingManager.getUnresolvedRootSlot(groupId);
+
+			if (multiTaskSlotFuture == null) {
+				// it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
+				final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId, resourceProfile);
+
+				multiTaskSlotFuture = slotSharingManager.createRootSlot(
+					multiTaskSlotRequestId,
+					futureSlot,
+					allocatedSlotRequestId);
+
+				futureSlot.whenComplete(
+					(AllocatedSlot allocatedSlot, Throwable throwable) -> {
+						final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
+
+						if (taskSlot != null) {
+							// still valid
+							if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
+								taskSlot.release(throwable);
+							} else {
+								if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
+									taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
+										allocatedSlot.getAllocationId() + '.'));
+								}
+							}
+						} else {
+							releaseSlot(
+								allocatedSlotRequestId,
+								null,
+								new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
+						}
+					});
+			}
+
+			return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlotFuture, Locality.UNKNOWN);
+
+		} else {
+			throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.');
+		}
+	}
+
+	/**
+	 * Allocates an allocated slot first by polling from the available slots and then requesting a new
+	 * slot from the ResourceManager if no fitting slot could be found.
+	 *
+	 * @param slotRequestId identifying the slot allocation request
+	 * @param resourceProfile which the allocated slot should fulfill
+	 * @param locationPreferences for the allocated slot
+	 * @param allowQueuedScheduling true if the slot allocation can be completed in the future
+	 * @return Future containing the allocated simple slot
+	 */
+	private CompletableFuture<SlotAndLocality> requestAllocatedSlot(
+			SlotRequestId slotRequestId,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling) {
+
+		final CompletableFuture<SlotAndLocality> allocatedSlotLocalityFuture;
+
+		// (1) do we have a slot available already?
+		SlotAndLocality slotFromPool = pollAndAllocateSlot(slotRequestId, resourceProfile, locationPreferences);
+
+		if (slotFromPool != null) {
+			allocatedSlotLocalityFuture = CompletableFuture.completedFuture(slotFromPool);
+		} else if (allowQueuedScheduling) {
+			// we have to request a new allocated slot
+			CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestNewAllocatedSlot(
+				slotRequestId,
+				resourceProfile);
+
+			allocatedSlotLocalityFuture = allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot) -> new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
+		} else {
+			allocatedSlotLocalityFuture = FutureUtils.completedExceptionally(new NoResourceAvailableException("Could not allocate a simple slot for " +
+				slotRequestId + '.'));
+		}
+
+		return allocatedSlotLocalityFuture;
+	}
+
+	/**
+	 * Requests a new slot with the given {@link ResourceProfile} from the ResourceManager. If there is
+	 * currently not ResourceManager connected, then the request is stashed and send once a new
+	 * ResourceManager is connected.
+	 *
+	 * @param slotRequestId identifying the requested slot
+	 * @param resourceProfile which the requested slot should fulfill
+	 * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool}
+	 */
+	private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(
+		SlotRequestId slotRequestId,
+		ResourceProfile resourceProfile) {
+
+		final PendingRequest pendingRequest = new PendingRequest(
+			slotRequestId,
+			resourceProfile);
+
+		if (resourceManagerGateway == null) {
+			stashRequestWaitingForResourceManager(pendingRequest);
+		} else {
+			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
+		}
+
+		return pendingRequest.getAllocatedSlotFuture();
+	}
+
+	private void requestSlotFromResourceManager(
+			final ResourceManagerGateway resourceManagerGateway,
+			final PendingRequest pendingRequest) {
+
+		Preconditions.checkNotNull(resourceManagerGateway);
+		Preconditions.checkNotNull(pendingRequest);
+
+		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+
+		final AllocationID allocationId = new AllocationID();
+
+		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
+
+		pendingRequest.getAllocatedSlotFuture().whenComplete(
+			(value, throwable) -> {
+				if (throwable != null) {
+					resourceManagerGateway.cancelSlotRequest(allocationId);
+				}
+			});
+
+		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
+			jobMasterId,
+			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
+			resourceManagerRequestsTimeout);
+
+		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
+			(Acknowledge value) -> {
+				slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId());
+			},
+			getMainThreadExecutor());
+
+		// on failure, fail the request future
+		slotRequestProcessingFuture.whenCompleteAsync(
+			(Void v, Throwable failure) -> {
+				if (failure != null) {
+					slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
+				}
+			},
+			getMainThreadExecutor());
+	}
+
+	private void slotRequestToResourceManagerSuccess(final SlotRequestId requestId) {
+		// a request is pending from the ResourceManager to a (future) TaskManager
+		// we only add the watcher here in case that request times out
+		scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				checkTimeoutSlotAllocation(requestId);
+			}
+		}, resourceManagerAllocationTimeout);
+	}
+
+	private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
+		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
+		if (request != null) {
+			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
+					"No pooled slot available and request to ResourceManager for new slot failed", failure));
+		} else {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+			}
+		}
+	}
+
+	private void checkTimeoutSlotAllocation(SlotRequestId slotRequestID) {
+		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
+		if (request != null) {
+			failPendingRequest(request, new TimeoutException("Slot allocation request " + slotRequestID + " timed out"));
+		}
+	}
+
+	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
+
+		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
+
+		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
+
+		scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
+			}
+		}, resourceManagerRequestsTimeout);
+	}
+
+	private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) {
+		PendingRequest request = waitingForResourceManager.remove(slotRequestId);
+		if (request != null) {
+			failPendingRequest(
+				request,
+				new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot releasing & offering
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CompletableFuture<Acknowledge> releaseSlot(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
+
+		if (slotSharingGroupId != null) {
+			final SlotSharingManager multiTaskSlotManager = slotSharingManagers.get(slotSharingGroupId);
+
+			if (multiTaskSlotManager != null) {
+				final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(slotRequestId);
+
+				if (taskSlot != null) {
+					taskSlot.release(cause);
+				} else {
+					log.debug("Could not find slot {} in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId, cause);
+				}
+			} else {
+				log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId, cause);
+			}
+		} else {
+			final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
+
+			if (pendingRequest != null) {
+				failPendingRequest(pendingRequest, new FlinkException("Pending slot request with " + slotRequestId + " has been released."));
+			} else {
+				final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
+
+				if (allocatedSlot != null) {
+					// sanity check
+					if (allocatedSlot.releasePayload(cause)) {
+						tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+					}
+				} else {
+					log.debug("There is no allocated slot with allocation id {}. Ignoring the release slot request.", slotRequestId, cause);
+				}
+			}
+		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	/**
+	 * Checks whether there exists a pending request with the given allocation id and removes it
+	 * from the internal data structures.
+	 *
+	 * @param requestId identifying the pending request
+	 * @return pending request if there is one, otherwise null
+	 */
+	@Nullable
+	private PendingRequest removePendingRequest(SlotRequestId requestId) {
+		PendingRequest result = waitingForResourceManager.remove(requestId);
+
+		if (result != null) {
+			// sanity check
+			assert !pendingRequests.containsKeyA(requestId) : "A pending requests should only be part of either " +
+				"the pendingRequests or waitingForResourceManager but not both.";
+
+			return result;
+		} else {
+			return pendingRequests.removeKeyA(requestId);
+		}
+	}
+
+	private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
+		Preconditions.checkNotNull(pendingRequest);
+		Preconditions.checkNotNull(e);
+
+		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
+			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
+		}
+	}
+
+	@Nullable
+	private SlotAndLocality pollAndAllocateSlot(
+		SlotRequestId slotRequestId,
+		ResourceProfile resourceProfile,
+		Collection<TaskManagerLocation> locationPreferences) {
+		SlotAndLocality slotFromPool = availableSlots.poll(resourceProfile, locationPreferences);
+
+		if (slotFromPool != null) {
+			allocatedSlots.add(slotRequestId, slotFromPool.getSlot());
+		}
+
+		return slotFromPool;
+	}
+
+	/**
+	 * Tries to fulfill with the given allocated slot a pending slot request or add the
+	 * allocated slot to the set of available slots if no matching request is available.
+	 *
+	 * @param allocatedSlot which shall be returned
+	 */
+	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
+		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
+
+		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+
+		if (pendingRequest != null) {
+			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
+
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
+		} else {
+			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
+		}
+	}
+
+	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
+		final ResourceProfile slotResources = slot.getResourceProfile();
+
+		// try the requests sent to the resource manager first
+		for (PendingRequest request : pendingRequests.values()) {
+			if (slotResources.isMatching(request.getResourceProfile())) {
+				pendingRequests.removeKeyA(request.getSlotRequestId());
+				return request;
+			}
+		}
+
+		// try the requests waiting for a resource manager connection next
+		for (PendingRequest request : waitingForResourceManager.values()) {
+			if (slotResources.isMatching(request.getResourceProfile())) {
+				waitingForResourceManager.remove(request.getSlotRequestId());
+				return request;
+			}
+		}
+
+		// no request pending, or no request matches
+		return null;
+	}
+
+	@Override
+	public CompletableFuture<Collection<SlotOffer>> offerSlots(
+			TaskManagerLocation taskManagerLocation,
+			TaskManagerGateway taskManagerGateway,
+			Collection<SlotOffer> offers) {
+		validateRunsInMainThread();
+
+		List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
+			offer -> {
+				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
+					taskManagerLocation,
+					taskManagerGateway,
+					offer)
+					.thenApply(
+						(acceptedSlot) -> {
+							if (acceptedSlot) {
+								return Optional.of(offer);
+							} else {
+								return Optional.empty();
+							}
+						});
+
+				return acceptedSlotOffer;
+			}
+		).collect(Collectors.toList());
+
+		CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
+
+		CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = optionalSlotOffers.thenApply(
+			collection -> {
+				Collection<SlotOffer> slotOffers = collection
+					.stream()
+					.flatMap(
+						opt -> opt.map(Stream::of).orElseGet(Stream::empty))
+					.collect(Collectors.toList());
+
+				return slotOffers;
+			});
+
+		return resultingSlotOffers;
+	}
+
+	/**
+	 * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
+	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
+	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
+	 * request waiting for this slot (maybe fulfilled by some other returned slot).
+	 *
+	 * @param taskManagerLocation location from where the offer comes from
+	 * @param taskManagerGateway TaskManager gateway
+	 * @param slotOffer the offered slot
+	 * @return True if we accept the offering
+	 */
+	@Override
+	public CompletableFuture<Boolean> offerSlot(
+			final TaskManagerLocation taskManagerLocation,
+			final TaskManagerGateway taskManagerGateway,
+			final SlotOffer slotOffer) {
+		validateRunsInMainThread();
+
+		// check if this TaskManager is valid
+		final ResourceID resourceID = taskManagerLocation.getResourceID();
+		final AllocationID allocationID = slotOffer.getAllocationId();
+
+		if (!registeredTaskManagers.contains(resourceID)) {
+			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+					slotOffer.getAllocationId(), taskManagerLocation);
+			return CompletableFuture.completedFuture(false);
+		}
+
+		// check whether we have already using this slot
+		if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
+			LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
+
+			// return true here so that the sender will get a positive acknowledgement to the retry
+			// and mark the offering as a success
+			return CompletableFuture.completedFuture(true);
+		}
+
+		final AllocatedSlot allocatedSlot = new AllocatedSlot(
+			slotOffer.getAllocationId(),
+			taskManagerLocation,
+			slotOffer.getSlotIndex(),
+			slotOffer.getResourceProfile(),
+			taskManagerGateway);
+
+		// check whether we have request waiting for this slot
+		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
+		if (pendingRequest != null) {
+			// we were waiting for this!
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+
+			if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
+				// we could not complete the pending slot future --> try to fulfill another pending request
+				allocatedSlots.remove(pendingRequest.getSlotRequestId());
+				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+			}
+		}
+		else {
+			// we were actually not waiting for this:
+			//   - could be that this request had been fulfilled
+			//   - we are receiving the slots from TaskManagers after becoming leaders
+			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+		}
+
+		// we accepted the request in any case. slot will be released after it idled for
+		// too long and timed out
+		return CompletableFuture.completedFuture(true);
+	}
+
+
+	// TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active)
+
+	// TODO - release slots that were not used to the resource manager
+
+	// ------------------------------------------------------------------------
+	//  Error Handling
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Fail the specified allocation and release the corresponding slot if we have one.
+	 * This may triggered by JobManager when some slot allocation failed with timeout.
+	 * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
+	 * and decided to take it back.
+	 *
+	 * @param allocationID Represents the allocation which should be failed
+	 * @param cause        The cause of the failure
+	 */
+	@Override
+	public void failAllocation(final AllocationID allocationID, final Exception cause) {
+		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
+		if (pendingRequest != null) {
+			// request was still pending
+			failPendingRequest(pendingRequest, cause);
+		}
+		else if (availableSlots.tryRemove(allocationID)) {
+			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+		}
+		else {
+			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
+			if (allocatedSlot != null) {
+				// release the slot.
+				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
+				allocatedSlot.releasePayload(cause);
+			}
+			else {
+				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+			}
+		}
+		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
+	}
+
+	// ------------------------------------------------------------------------
+	//  Resource
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
+	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
+	 *
+	 * @param resourceID The id of the TaskManager
+	 * @return Future acknowledge if th operation was successful
+	 */
+	@Override
+	public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
+		registeredTaskManagers.add(resourceID);
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	/**
+	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
+	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
+	 *
+	 * @param resourceID The id of the TaskManager
+	 */
+	@Override
+	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
+		if (registeredTaskManagers.remove(resourceID)) {
+			availableSlots.removeAllForTaskManager(resourceID);
+
+			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+
+			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
+				allocatedSlot.releasePayload(new FlinkException("TaskManager " + resourceID + " was released."));
+			}
+		}
+
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Methods for tests
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	protected AllocatedSlots getAllocatedSlots() {
+		return allocatedSlots;
+	}
+
+	@VisibleForTesting
+	protected AvailableSlots getAvailableSlots() {
+		return availableSlots;
+	}
+
+	@VisibleForTesting
+	DualKeyMap<SlotRequestId, AllocationID, PendingRequest> getPendingRequests() {
+		return pendingRequests;
+	}
+
+	@VisibleForTesting
+	Map<SlotRequestId, PendingRequest> getWaitingForResourceManager() {
+		return waitingForResourceManager;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Organize allocated slots from different points of view.
+	 */
+	static class AllocatedSlots {
+
+		/** All allocated slots organized by TaskManager's id. */
+		private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
+
+		/** All allocated slots organized by AllocationID. */
+		private final DualKeyMap<AllocationID, SlotRequestId, AllocatedSlot> allocatedSlotsById;
+
+		AllocatedSlots() {
+			this.allocatedSlotsByTaskManager = new HashMap<>(16);
+			this.allocatedSlotsById = new DualKeyMap<>(16);
+		}
+
+		/**
+		 * Adds a new slot to this collection.
+		 *
+		 * @param allocatedSlot The allocated slot
+		 */
+		void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) {
+			allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
+
+			final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
+
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
+				resourceID,
+				resourceId -> new HashSet<>(4));
+
+			slotsForTaskManager.add(allocatedSlot);
+		}
+
+		/**
+		 * Get allocated slot with allocation id.
+		 *
+		 * @param allocationID The allocation id
+		 * @return The allocated slot, null if we can't find a match
+		 */
+		AllocatedSlot get(final AllocationID allocationID) {
+			return allocatedSlotsById.getKeyA(allocationID);
+		}
+
+		AllocatedSlot get(final SlotRequestId slotRequestId) {
+			return allocatedSlotsById.getKeyB(slotRequestId);
+		}
+
+		/**
+		 * Check whether we have allocated this slot.
+		 *
+		 * @param slotAllocationId The allocation id of the slot to check
+		 * @return True if we contains this slot
+		 */
+		boolean contains(AllocationID slotAllocationId) {
+			return allocatedSlotsById.containsKeyA(slotAllocationId);
+		}
+
+		/**
+		 * Removes the allocated slot specified by the provided slot allocation id.
+		 *
+		 * @param allocationID identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
+		 */
+		@Nullable
+		AllocatedSlot remove(final AllocationID allocationID) {
+			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);
+
+			if (allocatedSlot != null) {
+				removeAllocatedSlot(allocatedSlot);
+			}
+
+			return allocatedSlot;
+		}
+
+		/**
+		 * Removes the allocated slot specified by the provided slot request id.
+		 *
+		 * @param slotRequestId identifying the allocated slot to remove
+		 * @return The removed allocated slot or null.
+		 */
+		@Nullable
+		AllocatedSlot remove(final SlotRequestId slotRequestId) {
+			final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);
+
+			if (allocatedSlot != null) {
+				removeAllocatedSlot(allocatedSlot);
+			}
+
+			return allocatedSlot;
+		}
+
+		private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
+			Preconditions.checkNotNull(allocatedSlot);
+			final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
+			Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+
+			slotsForTM.remove(allocatedSlot);
+
+			if (slotsForTM.isEmpty()) {
+				allocatedSlotsByTaskManager.remove(taskManagerId);
+			}
+		}
+
+		/**
+		 * Get all allocated slot from same TaskManager.
+		 *
+		 * @param resourceID The id of the TaskManager
+		 * @return Set of slots which are allocated from the same TaskManager
+		 */
+		Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
+			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
+			if (slotsForTaskManager != null) {
+				for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
+					allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
+				}
+				return slotsForTaskManager;
+			}
+			else {
+				return Collections.emptySet();
+			}
+		}
+
+		void clear() {
+			allocatedSlotsById.clear();
+			allocatedSlotsByTaskManager.clear();
+		}
+
+		@VisibleForTesting
+		boolean containResource(final ResourceID resourceID) {
+			return allocatedSlotsByTaskManager.containsKey(resourceID);
+		}
+
+		@VisibleForTesting
+		int size() {
+			return allocatedSlotsById.size();
+		}
+
+		@VisibleForTesting
+		Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
+			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
+				return allocatedSlotsByTaskManager.get(resourceId);
+			} else {
+				return Collections.emptySet();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Organize all available slots from different points of view.
+	 */
+	protected static class AvailableSlots {
+
+		/** All available slots organized by TaskManager. */
+		private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;
+
+		/** All available slots organized by host. */
+		private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost;
+
+		/** The available slots, with the time when they were inserted. */
+		private final HashMap<AllocationID, SlotAndTimestamp> availableSlots;
+
+		AvailableSlots() {
+			this.availableSlotsByTaskManager = new HashMap<>();
+			this.availableSlotsByHost = new HashMap<>();
+			this.availableSlots = new HashMap<>();
+		}
+
+		/**
+		 * Adds an available slot.
+		 *
+		 * @param slot The slot to add
+		 */
+		void add(final AllocatedSlot slot, final long timestamp) {
+			checkNotNull(slot);
+
+			SlotAndTimestamp previous = availableSlots.put(
+					slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
+
+			if (previous == null) {
+				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+				final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+				Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
+				if (slotsForTaskManager == null) {
+					slotsForTaskManager = new HashSet<>();
+					availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
+				}
+				slotsForTaskManager.add(slot);
+
+				Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+				if (slotsForHost == null) {
+					slotsForHost = new HashSet<>();
+					availableSlotsByHost.put(host, slotsForHost);
+				}
+				slotsForHost.add(slot);
+			}
+			else {
+				throw new IllegalStateException("slot already contained");
+			}
+		}
+
+		/**
+		 * Check whether we have this slot.
+		 */
+		boolean contains(AllocationID slotId) {
+			return availableSlots.containsKey(slotId);
+		}
+
+		/**
+		 * Poll a slot which matches the required resource profile. The polling tries to satisfy the
+		 * location preferences, by TaskManager and by host.
+		 *
+		 * @param resourceProfile      The required resource profile.
+		 * @param locationPreferences  The location preferences, in order to be checked.
+		 *
+		 * @return Slot which matches the resource profile, null if we can't find a match
+		 */
+		SlotAndLocality poll(ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences) {
+			// fast path if no slots are available
+			if (availableSlots.isEmpty()) {
+				return null;
+			}
+
+			boolean hadLocationPreference = false;
+
+			if (locationPreferences != null && !locationPreferences.isEmpty()) {
+
+				// first search by TaskManager
+				for (TaskManagerLocation location : locationPreferences) {
+					hadLocationPreference = true;
+
+					final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
+					if (onTaskManager != null) {
+						for (AllocatedSlot candidate : onTaskManager) {
+							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+								remove(candidate.getAllocationId());
+								return new SlotAndLocality(candidate, Locality.LOCAL);
+							}
+						}
+					}
+				}
+
+				// now, search by host
+				for (TaskManagerLocation location : locationPreferences) {
+					final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
+					if (onHost != null) {
+						for (AllocatedSlot candidate : onHost) {
+							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+								remove(candidate.getAllocationId());
+								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
+							}
+						}
+					}
+				}
+			}
+
+			// take any slot
+			for (SlotAndTimestamp candidate : availableSlots.values()) {
+				final AllocatedSlot slot = candidate.slot();
+
+				if (slot.getResourceProfile().isMatching(resourceProfile)) {
+					remove(slot.getAllocationId());
+					return new SlotAndLocality(
+							slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
+				}
+			}
+
+			// nothing available that matches
+			return null;
+		}
+
+		/**
+		 * Remove all available slots come from specified TaskManager.
+		 *
+		 * @param taskManager The id of the TaskManager
+		 */
+		void removeAllForTaskManager(final ResourceID taskManager) {
+			// remove from the by-TaskManager view
+			final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
+
+			if (slotsForTm != null && slotsForTm.size() > 0) {
+				final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
+				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+				// remove from the base set and the by-host view
+				for (AllocatedSlot slot : slotsForTm) {
+					availableSlots.remove(slot.getAllocationId());
+					slotsForHost.remove(slot);
+				}
+
+				if (slotsForHost.isEmpty()) {
+					availableSlotsByHost.remove(host);
+				}
+			}
+		}
+
+		boolean tryRemove(AllocationID slotId) {
+			final SlotAndTimestamp sat = availableSlots.remove(slotId);
+			if (sat != null) {
+				final AllocatedSlot slot = sat.slot();
+				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+				final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+				final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID);
+				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+				slotsForTm.remove(slot);
+				slotsForHost.remove(slot);
+
+				if (slotsForTm.isEmpty()) {
+					availableSlotsByTaskManager.remove(resourceID);
+				}
+				if (slotsForHost.isEmpty()) {
+					availableSlotsByHost.remove(host);
+				}
+
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+
+		private void remove(AllocationID slotId) throws IllegalStateException {
+			if (!tryRemove(slotId)) {
+				throw new IllegalStateException("slot not contained");
+			}
+		}
+
+		@VisibleForTesting
+		boolean containsTaskManager(ResourceID resourceID) {
+			return availableSlotsByTaskManager.containsKey(resourceID);
+		}
+
+		@VisibleForTesting
+		public int size() {
+			return availableSlots.size();
+		}
+
+		@VisibleForTesting
+		void clear() {
+			availableSlots.clear();
+			availableSlotsByTaskManager.clear();
+			availableSlotsByHost.clear();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
+	 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
+	 */
+	private static class ProviderAndOwner implements SlotOwner, SlotProvider {
+
+		private final SlotPoolGateway gateway;
+
+		private final Time timeout;
+
+		ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
+			this.gateway = gateway;
+			this.timeout = timeout;
+		}
+
+		@Override
+		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
+			return gateway
+				.releaseSlot(
+					slot.getSlotRequestId(),
+					slot.getSlotSharingGroupId(),
+					new FlinkException("Slot is being returned to the SlotPool."))
+				.thenApply(
+					(Acknowledge acknowledge) -> true);
+		}
+
+		@Override
+		public CompletableFuture<LogicalSlot> allocateSlot(
+				ScheduledUnit task,
+				boolean allowQueued,
+				Collection<TaskManagerLocation> preferredLocations) {
+
+			final SlotRequestId requestId = new SlotRequestId();
+			CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(
+				requestId,
+				task,
+				ResourceProfile.UNKNOWN,
+				preferredLocations,
+				allowQueued,
+				timeout);
+
+			slotFuture.whenComplete(
+				(LogicalSlot slot, Throwable failure) -> {
+					if (failure != null) {
+						gateway.releaseSlot(
+							requestId,
+							task.getSlotSharingGroupId(),
+							failure);
+					}
+			});
+
+			return slotFuture;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pending request for a slot.
+	 */
+	private static class PendingRequest {
+
+		private final SlotRequestId slotRequestId;
+
+		private final ResourceProfile resourceProfile;
+
+		private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
+
+		PendingRequest(
+				SlotRequestId slotRequestId,
+				ResourceProfile resourceProfile) {
+			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
+			this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+
+			allocatedSlotFuture = new CompletableFuture<>();
+		}
+
+		public SlotRequestId getSlotRequestId() {
+			return slotRequestId;
+		}
+
+		public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
+			return allocatedSlotFuture;
+		}
+
+		public ResourceProfile getResourceProfile() {
+			return resourceProfile;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A slot, together with the timestamp when it was added.
+	 */
+	private static class SlotAndTimestamp {
+
+		private final AllocatedSlot slot;
+
+		private final long timestamp;
+
+		SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
+			this.slot = slot;
+			this.timestamp = timestamp;
+		}
+
+		public AllocatedSlot slot() {
+			return slot;
+		}
+
+		public long timestamp() {
+			return timestamp;
+		}
+
+		@Override
+		public String toString() {
+			return slot + " @ " + timestamp;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
new file mode 100644
index 0000000..d3b51f7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The gateway for calls on the {@link SlotPool}. 
+ */
+public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	void suspend();
+
+	// ------------------------------------------------------------------------
+	//  resource manager connection
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Connects the SlotPool to the given ResourceManager. After this method is called, the
+	 * SlotPool will be able to request resources from the given ResourceManager.
+	 * 
+	 * @param resourceManagerGateway  The RPC gateway for the resource manager.
+	 */
+	void connectToResourceManager(ResourceManagerGateway resourceManagerGateway);
+
+	/**
+	 * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
+	 * be able to request further slots from the Resource Manager, and all currently pending requests
+	 * to the resource manager will be canceled.
+	 * 
+	 * <p>The slot pool will still be able to serve slots from its internal pool.
+	 */
+	void disconnectResourceManager();
+
+	// ------------------------------------------------------------------------
+	//  registering / un-registering TaskManagers and slots
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Registers a TaskExecutor with the given {@link ResourceID} at {@link SlotPool}.
+	 *
+	 * @param resourceID identifying the TaskExecutor to register
+	 * @return Future acknowledge which is completed after the TaskExecutor has been registered
+	 */
+	CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
+
+	/**
+	 * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
+	 *
+	 * @param resourceID identifying the TaskExecutor which shall be released from the SlotPool
+	 * @return Future acknowledge which is completed after the TaskExecutor has been released
+	 */
+	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
+
+	/**
+	 * Offers a slot to the {@link SlotPool}. The slot offer can be accepted or
+	 * rejected.
+	 *
+	 * @param taskManagerLocation from which the slot offer originates
+	 * @param taskManagerGateway to talk to the slot offerer
+	 * @param slotOffer slot which is offered to the {@link SlotPool}
+	 * @return True (future) if the slot has been accepted, otherwise false (future)
+	 */
+	CompletableFuture<Boolean> offerSlot(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		SlotOffer slotOffer);
+
+	/**
+	 * Offers multiple slots to the {@link SlotPool}. The slot offerings can be
+	 * individually accepted or rejected by returning the collection of accepted
+	 * slot offers.
+	 *
+	 * @param taskManagerLocation from which the slot offeres originate
+	 * @param taskManagerGateway to talk to the slot offerer
+	 * @param offers slot offers which are offered to the {@link SlotPool}
+	 * @return A collection of accepted slot offers (future). The remaining slot offers are
+	 * 			implicitly rejected.
+	 */
+	CompletableFuture<Collection<SlotOffer>> offerSlots(
+		TaskManagerLocation taskManagerLocation,
+		TaskManagerGateway taskManagerGateway,
+		Collection<SlotOffer> offers);
+
+	/**
+	 * Fails the slot with the given allocation id.
+	 *
+	 * @param allocationID identifying the slot which is being failed
+	 * @param cause of the failure
+	 */
+	void failAllocation(AllocationID allocationID, Exception cause);
+
+	// ------------------------------------------------------------------------
+	//  allocating and disposing slots
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Requests to allocate a slot for the given {@link ScheduledUnit}. The request
+	 * is uniquely identified by the provided {@link SlotRequestId} which can also
+	 * be used to release the slot via {@link #releaseSlot(SlotRequestId, SlotSharingGroupId, Throwable)}.
+	 * The allocated slot will fulfill the requested {@link ResourceProfile} and it
+	 * is tried to place it on one of the location preferences.
+	 *
+	 * <p>If the returned future must not be completed right away (a.k.a. the slot request
+	 * can be queued), allowQueuedScheduling must be set to true.
+	 *
+	 * @param slotRequestId identifying the requested slot
+	 * @param scheduledUnit for which to allocate slot
+	 * @param resourceProfile which the allocated slot must fulfill
+	 * @param locationPreferences which define where the allocated slot should be placed, this can also be empty
+	 * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed)
+	 * @param timeout for the operation
+	 * @return
+	 */
+	CompletableFuture<LogicalSlot> allocateSlot(
+			SlotRequestId slotRequestId,
+			ScheduledUnit scheduledUnit,
+			ResourceProfile resourceProfile,
+			Collection<TaskManagerLocation> locationPreferences,
+			boolean allowQueuedScheduling,
+			@RpcTimeout Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
new file mode 100644
index 0000000..8e8d019
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The slot provider is responsible for preparing slots for ready-to-run tasks.
+ * 
+ * <p>It supports two allocating modes:
+ * <ul>
+ *     <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
+ *         {@link CompletableFuture#getNow(Object)} to get the allocated slot.</li>
+ *     <li>Queued allocating: A request for a task slot is queued and returns a future that will be
+ *         fulfilled as soon as a slot becomes available.</li>
+ * </ul>
+ */
+public interface SlotProvider {
+
+	/**
+	 * Allocating slot with specific requirement.
+	 *
+	 * @param task         The task to allocate the slot for
+	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
+	 * @param preferredLocations preferred locations for the slot allocation
+	 * @return The future of the allocation
+	 */
+	CompletableFuture<LogicalSlot> allocateSlot(
+		ScheduledUnit task,
+		boolean allowQueued,
+		Collection<TaskManagerLocation> preferredLocations);
+}