You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:27 UTC

[10/11] flink git commit: [FLINK-7956] [flip6] Add support for queued scheduling with slot sharing to SlotPool

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
deleted file mode 100644
index 68f5be6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ /dev/null
@@ -1,1219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
-import org.apache.flink.runtime.jobmanager.slots.SlotException;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.clock.Clock;
-import org.apache.flink.runtime.util.clock.SystemClock;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It will will attempt to acquire new slots
- * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
- * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
- * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
- * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
- * but we still have some free slots.
- * <p>
- * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
- * eliminate ambiguities.
- * 
- * TODO : Make pending requests location preference aware
- * TODO : Make pass location preferences to ResourceManager when sending a slot request
- */
-public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
-
-	/** The log for the pool - shared also with the internal classes */
-	static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
-
-	// ------------------------------------------------------------------------
-
-	private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
-
-	private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
-
-	private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
-
-	// ------------------------------------------------------------------------
-
-	private final JobID jobId;
-
-	private final ProviderAndOwner providerAndOwner;
-
-	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered */
-	private final HashSet<ResourceID> registeredTaskManagers;
-
-	/** The book-keeping of all allocated slots */
-	private final AllocatedSlots allocatedSlots;
-
-	/** The book-keeping of all available slots */
-	private final AvailableSlots availableSlots;
-
-	/** All pending requests waiting for slots */
-	private final DualKeyMap<SlotRequestID, AllocationID, PendingRequest> pendingRequests;
-
-	/** The requests that are waiting for the resource manager to be connected */
-	private final HashMap<SlotRequestID, PendingRequest> waitingForResourceManager;
-
-	/** Timeout for request calls to the ResourceManager */
-	private final Time resourceManagerRequestsTimeout;
-
-	/** Timeout for allocation round trips (RM -> launch TM -> offer slot) */
-	private final Time resourceManagerAllocationTimeout;
-
-	private final Clock clock;
-
-	/** the fencing token of the job manager */
-	private JobMasterId jobMasterId;
-
-	/** The gateway to communicate with resource manager */
-	private ResourceManagerGateway resourceManagerGateway;
-
-	private String jobManagerAddress;
-
-	// ------------------------------------------------------------------------
-
-	public SlotPool(RpcService rpcService, JobID jobId) {
-		this(rpcService, jobId, SystemClock.getInstance(),
-				DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
-	}
-
-	public SlotPool(
-			RpcService rpcService,
-			JobID jobId,
-			Clock clock,
-			Time slotRequestTimeout,
-			Time resourceManagerAllocationTimeout,
-			Time resourceManagerRequestTimeout) {
-
-		super(rpcService);
-
-		this.jobId = checkNotNull(jobId);
-		this.clock = checkNotNull(clock);
-		this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
-		this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
-
-		this.registeredTaskManagers = new HashSet<>();
-		this.allocatedSlots = new AllocatedSlots();
-		this.availableSlots = new AvailableSlots();
-		this.pendingRequests = new DualKeyMap<>(16);
-		this.waitingForResourceManager = new HashMap<>(16);
-
-		this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
-
-		this.jobMasterId = null;
-		this.resourceManagerGateway = null;
-		this.jobManagerAddress = null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Starting and Stopping
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void start() {
-		throw new UnsupportedOperationException("Should never call start() without leader ID");
-	}
-
-	/**
-	 * Start the slot pool to accept RPC calls.
-	 *
-	 * @param jobMasterId The necessary leader id for running the job.
-	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
-	 */
-	public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
-		this.jobMasterId = checkNotNull(jobMasterId);
-		this.jobManagerAddress = checkNotNull(newJobManagerAddress);
-
-		// TODO - start should not throw an exception
-		try {
-			super.start();
-		} catch (Exception e) {
-			throw new RuntimeException("This should never happen", e);
-		}
-	}
-
-	/**
-	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
-	 */
-	@Override
-	public void suspend() {
-		validateRunsInMainThread();
-
-		// suspend this RPC endpoint
-		stop();
-
-		// do not accept any requests
-		jobMasterId = null;
-		resourceManagerGateway = null;
-
-		// Clear (but not release!) the available slots. The TaskManagers should re-register them
-		// at the new leader JobManager/SlotPool
-		availableSlots.clear();
-		allocatedSlots.clear();
-		pendingRequests.clear();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Getting PoolOwner and PoolProvider
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the slot owner implementation for this pool.
-	 * 
-	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
-	 * 
-	 * @return The slot owner implementation for this pool.
-	 */
-	public SlotOwner getSlotOwner() {
-		return providerAndOwner;
-	}
-
-	/**
-	 * Gets the slot provider implementation for this pool.
-	 *
-	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
-	 *
-	 * @return The slot provider implementation for this pool.
-	 */
-	public SlotProvider getSlotProvider() {
-		return providerAndOwner;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Resource Manager Connection
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
-		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
-
-		// work on all slots waiting for this connection
-		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
-			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
-		}
-
-		// all sent off
-		waitingForResourceManager.clear();
-	}
-
-	@Override
-	public void disconnectResourceManager() {
-		this.resourceManagerGateway = null;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Slot Allocation
-	// ------------------------------------------------------------------------
-
-	@Override
-	public CompletableFuture<LogicalSlot> allocateSlot(
-			SlotRequestID requestId,
-			ScheduledUnit task,
-			ResourceProfile resources,
-			Iterable<TaskManagerLocation> locationPreferences,
-			Time timeout) {
-
-		return internalAllocateSlot(requestId, task, resources, locationPreferences);
-	}
-
-	@Override
-	public void returnAllocatedSlot(SlotRequestID slotRequestId) {
-		final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
-
-		if (allocatedSlot != null) {
-			if (allocatedSlot.releaseLogicalSlot()) {
-				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
-			} else {
-				throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.');
-			}
-		} else {
-			log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
-		}
-	}
-
-	@Override
-	public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
-		final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
-
-		if (pendingRequest != null) {
-			failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled."));
-		} else {
-			final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId);
-
-			if (allocatedSlot != null) {
-				LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId);
-				// TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
-				allocatedSlot.triggerLogicalSlotRelease();
-			} else {
-				LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId);
-			}
-		}
-
-		return CompletableFuture.completedFuture(Acknowledge.get());
-	}
-
-	CompletableFuture<LogicalSlot> internalAllocateSlot(
-			SlotRequestID requestId,
-			ScheduledUnit task,
-			ResourceProfile resources,
-			Iterable<TaskManagerLocation> locationPreferences) {
-
-		// (1) do we have a slot available already?
-		SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
-		if (slotFromPool != null) {
-			final AllocatedSlot allocatedSlot = slotFromPool.slot();
-
-			final SimpleSlot simpleSlot;
-			try {
-				simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality());
-			} catch (SlotException e) {
-				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-
-				return FutureUtils.completedExceptionally(e);
-			}
-
-			allocatedSlots.add(requestId, allocatedSlot);
-			return CompletableFuture.completedFuture(simpleSlot);
-		}
-
-		// we have to request a new allocated slot
-		CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestSlot(
-			requestId,
-			resources);
-
-		return allocatedSlotFuture.thenApply(
-			(AllocatedSlot allocatedSlot) -> {
-				try {
-					return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
-				} catch (SlotException e) {
-					throw new CompletionException("Could not allocate a logical simple slot from allocate slot " +
-						allocatedSlot + '.', e);
-				}
-			});
-	}
-
-	/**
-	 * Checks whether there exists a pending request with the given allocation id and removes it
-	 * from the internal data structures.
-	 *
-	 * @param requestId identifying the pending request
-	 * @return pending request if there is one, otherwise null
-	 */
-	@Nullable
-	private PendingRequest removePendingRequest(SlotRequestID requestId) {
-		PendingRequest result = waitingForResourceManager.remove(requestId);
-
-		if (result != null) {
-			// sanity check
-			assert !pendingRequests.containsKeyA(requestId) : "A pending requests should only be part of either " +
-				"the pendingRequests or waitingForResourceManager but not both.";
-
-			return result;
-		} else {
-			return pendingRequests.removeKeyA(requestId);
-		}
-	}
-
-	private CompletableFuture<AllocatedSlot> requestSlot(
-		SlotRequestID slotRequestId,
-		ResourceProfile resourceProfile) {
-
-		final PendingRequest pendingRequest = new PendingRequest(
-			slotRequestId,
-			resourceProfile);
-
-		if (resourceManagerGateway == null) {
-			stashRequestWaitingForResourceManager(pendingRequest);
-		} else {
-			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
-		}
-
-		return pendingRequest.getAllocatedSlotFuture();
-	}
-
-	private void requestSlotFromResourceManager(
-			final ResourceManagerGateway resourceManagerGateway,
-			final PendingRequest pendingRequest) {
-
-		Preconditions.checkNotNull(resourceManagerGateway);
-		Preconditions.checkNotNull(pendingRequest);
-
-		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
-
-		final AllocationID allocationId = new AllocationID();
-
-		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
-
-		pendingRequest.getAllocatedSlotFuture().whenComplete(
-			(value, throwable) -> {
-				if (throwable != null) {
-					resourceManagerGateway.cancelSlotRequest(allocationId);
-				}
-			});
-
-		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
-			jobMasterId,
-			new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
-			resourceManagerRequestsTimeout);
-
-		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
-			(Acknowledge value) -> {
-				slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId());
-			},
-			getMainThreadExecutor());
-
-		// on failure, fail the request future
-		slotRequestProcessingFuture.whenCompleteAsync(
-			(Void v, Throwable failure) -> {
-				if (failure != null) {
-					slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
-				}
-			},
-			getMainThreadExecutor());
-	}
-
-	private void slotRequestToResourceManagerSuccess(final SlotRequestID requestId) {
-		// a request is pending from the ResourceManager to a (future) TaskManager
-		// we only add the watcher here in case that request times out
-		scheduleRunAsync(new Runnable() {
-			@Override
-			public void run() {
-				checkTimeoutSlotAllocation(requestId);
-			}
-		}, resourceManagerAllocationTimeout);
-	}
-
-	private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) {
-		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
-		if (request != null) {
-			request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
-					"No pooled slot available and request to ResourceManager for new slot failed", failure));
-		} else {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
-			}
-		}
-	}
-
-	private void checkTimeoutSlotAllocation(SlotRequestID slotRequestID) {
-		PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
-		if (request != null) {
-			failPendingRequest(request, new TimeoutException("Slot allocation request " + slotRequestID + " timed out"));
-		}
-	}
-
-	private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
-		Preconditions.checkNotNull(pendingRequest);
-		Preconditions.checkNotNull(e);
-
-		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
-			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
-			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
-		}
-	}
-
-	private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
-
-		LOG.info("Cannot serve slot request, no ResourceManager connected. " +
-				"Adding as pending request {}",  pendingRequest.getSlotRequestId());
-
-		waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
-
-		scheduleRunAsync(new Runnable() {
-			@Override
-			public void run() {
-				checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
-			}
-		}, resourceManagerRequestsTimeout);
-	}
-
-	private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequestId) {
-		PendingRequest request = waitingForResourceManager.remove(slotRequestId);
-		if (request != null) {
-			failPendingRequest(
-				request,
-				new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Slot releasing & offering
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Tries to fulfill with the given allocated slot a pending slot request or add the
-	 * allocated slot to the set of available slots if no matching request is available.
-	 *
-	 * @param allocatedSlot which shall be returned
-	 */
-	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
-		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
-
-		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
-
-		if (pendingRequest != null) {
-			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
-
-			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-		} else {
-			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
-			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-		}
-	}
-
-	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
-		final ResourceProfile slotResources = slot.getResourceProfile();
-
-		// try the requests sent to the resource manager first
-		for (PendingRequest request : pendingRequests.values()) {
-			if (slotResources.isMatching(request.getResourceProfile())) {
-				pendingRequests.removeKeyA(request.getSlotRequestId());
-				return request;
-			}
-		}
-
-		// try the requests waiting for a resource manager connection next
-		for (PendingRequest request : waitingForResourceManager.values()) {
-			if (slotResources.isMatching(request.getResourceProfile())) {
-				waitingForResourceManager.remove(request.getSlotRequestId());
-				return request;
-			}
-		}
-
-		// no request pending, or no request matches
-		return null;
-	}
-
-	@Override
-	public CompletableFuture<Collection<SlotOffer>> offerSlots(
-			TaskManagerLocation taskManagerLocation,
-			TaskManagerGateway taskManagerGateway,
-			Collection<SlotOffer> offers) {
-		validateRunsInMainThread();
-
-		List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
-			offer -> {
-				CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
-					taskManagerLocation,
-					taskManagerGateway,
-					offer)
-					.thenApply(
-						(acceptedSlot) -> {
-							if (acceptedSlot) {
-								return Optional.of(offer);
-							} else {
-								return Optional.empty();
-							}
-						});
-
-				return acceptedSlotOffer;
-			}
-		).collect(Collectors.toList());
-
-		CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
-
-		CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = optionalSlotOffers.thenApply(
-			collection -> {
-				Collection<SlotOffer> slotOffers = collection
-					.stream()
-					.flatMap(
-						opt -> opt.map(Stream::of).orElseGet(Stream::empty))
-					.collect(Collectors.toList());
-
-				return slotOffers;
-			});
-
-		return resultingSlotOffers;
-	}
-	
-	/**
-	 * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
-	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
-	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
-	 * request waiting for this slot (maybe fulfilled by some other returned slot).
-	 *
-	 * @param taskManagerLocation location from where the offer comes from
-	 * @param taskManagerGateway TaskManager gateway
-	 * @param slotOffer the offered slot
-	 * @return True if we accept the offering
-	 */
-	@Override
-	public CompletableFuture<Boolean> offerSlot(
-			final TaskManagerLocation taskManagerLocation,
-			final TaskManagerGateway taskManagerGateway,
-			final SlotOffer slotOffer) {
-		validateRunsInMainThread();
-
-		// check if this TaskManager is valid
-		final ResourceID resourceID = taskManagerLocation.getResourceID();
-		final AllocationID allocationID = slotOffer.getAllocationId();
-
-		if (!registeredTaskManagers.contains(resourceID)) {
-			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
-					slotOffer.getAllocationId(), taskManagerLocation);
-			return CompletableFuture.completedFuture(false);
-		}
-
-		// check whether we have already using this slot
-		if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
-			LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
-
-			// return true here so that the sender will get a positive acknowledgement to the retry
-			// and mark the offering as a success
-			return CompletableFuture.completedFuture(true);
-		}
-
-		final AllocatedSlot allocatedSlot = new AllocatedSlot(
-			slotOffer.getAllocationId(),
-			taskManagerLocation,
-			slotOffer.getSlotIndex(),
-			slotOffer.getResourceProfile(),
-			taskManagerGateway,
-			providerAndOwner);
-
-		// check whether we have request waiting for this slot
-		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
-		if (pendingRequest != null) {
-			// we were waiting for this!
-			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-		}
-		else {
-			// we were actually not waiting for this:
-			//   - could be that this request had been fulfilled
-			//   - we are receiving the slots from TaskManagers after becoming leaders
-			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
-		}
-
-		// we accepted the request in any case. slot will be released after it idled for
-		// too long and timed out
-		return CompletableFuture.completedFuture(true);
-	}
-
-	
-	// TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active)
-
-	// TODO - release slots that were not used to the resource manager
-
-	// ------------------------------------------------------------------------
-	//  Error Handling
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Fail the specified allocation and release the corresponding slot if we have one.
-	 * This may triggered by JobManager when some slot allocation failed with timeout.
-	 * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
-	 * and decided to take it back.
-	 *
-	 * @param allocationID Represents the allocation which should be failed
-	 * @param cause        The cause of the failure
-	 */
-	@Override
-	public void failAllocation(final AllocationID allocationID, final Exception cause) {
-		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
-		if (pendingRequest != null) {
-			// request was still pending
-			failPendingRequest(pendingRequest, cause);
-		}
-		else if (availableSlots.tryRemove(allocationID)) {
-			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
-		}
-		else {
-			AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
-			if (allocatedSlot != null) {
-				// release the slot.
-				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
-				allocatedSlot.triggerLogicalSlotRelease();
-			}
-			else {
-				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
-			}
-		}
-		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
-	}
-
-	// ------------------------------------------------------------------------
-	//  Resource
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
-	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
-	 *
-	 * @param resourceID The id of the TaskManager
-	 * @return Future acknowledge if th operation was successful
-	 */
-	@Override
-	public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
-		registeredTaskManagers.add(resourceID);
-
-		return CompletableFuture.completedFuture(Acknowledge.get());
-	}
-
-	/**
-	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
-	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
-	 *
-	 * @param resourceID The id of the TaskManager
-	 */
-	@Override
-	public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
-		if (registeredTaskManagers.remove(resourceID)) {
-			availableSlots.removeAllForTaskManager(resourceID);
-
-			final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
-			for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
-				allocatedSlot.triggerLogicalSlotRelease();
-				// TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests
-				allocatedSlot.releaseLogicalSlot();
-			}
-		}
-
-		return CompletableFuture.completedFuture(Acknowledge.get());
-	}
-
-	// ------------------------------------------------------------------------
-	//  Methods for tests
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	AllocatedSlots getAllocatedSlots() {
-		return allocatedSlots;
-	}
-
-	@VisibleForTesting
-	AvailableSlots getAvailableSlots() {
-		return availableSlots;
-	}
-
-	@VisibleForTesting
-	DualKeyMap<SlotRequestID, AllocationID, PendingRequest> getPendingRequests() {
-		return pendingRequests;
-	}
-
-	@VisibleForTesting
-	Map<SlotRequestID, PendingRequest> getWaitingForResourceManager() {
-		return waitingForResourceManager;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Helper classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Organize allocated slots from different points of view.
-	 */
-	static class AllocatedSlots {
-
-		/** All allocated slots organized by TaskManager's id */
-		private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
-
-		/** All allocated slots organized by AllocationID */
-		private final DualKeyMap<AllocationID, SlotRequestID, AllocatedSlot> allocatedSlotsById;
-
-		AllocatedSlots() {
-			this.allocatedSlotsByTaskManager = new HashMap<>(16);
-			this.allocatedSlotsById = new DualKeyMap<>(16);
-		}
-
-		/**
-		 * Adds a new slot to this collection.
-		 *
-		 * @param allocatedSlot The allocated slot
-		 */
-		void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) {
-			allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
-
-			final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
-
-			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
-				resourceID,
-				resourceId -> new HashSet<>(4));
-
-			slotsForTaskManager.add(allocatedSlot);
-		}
-
-		/**
-		 * Get allocated slot with allocation id
-		 *
-		 * @param allocationID The allocation id
-		 * @return The allocated slot, null if we can't find a match
-		 */
-		AllocatedSlot get(final AllocationID allocationID) {
-			return allocatedSlotsById.getKeyA(allocationID);
-		}
-
-		AllocatedSlot get(final SlotRequestID slotRequestId) {
-			return allocatedSlotsById.getKeyB(slotRequestId);
-		}
-
-		/**
-		 * Check whether we have allocated this slot
-		 *
-		 * @param slotAllocationId The allocation id of the slot to check
-		 * @return True if we contains this slot
-		 */
-		boolean contains(AllocationID slotAllocationId) {
-			return allocatedSlotsById.containsKeyA(slotAllocationId);
-		}
-
-		/**
-		 * Removes the allocated slot specified by the provided slot allocation id.
-		 *
-		 * @param allocationID identifying the allocated slot to remove
-		 * @return The removed allocated slot or null.
-		 */
-		@Nullable
-		AllocatedSlot remove(final AllocationID allocationID) {
-			AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);
-
-			if (allocatedSlot != null) {
-				removeAllocatedSlot(allocatedSlot);
-			}
-
-			return allocatedSlot;
-		}
-
-		/**
-		 * Removes the allocated slot specified by the provided slot request id.
-		 *
-		 * @param slotRequestId identifying the allocated slot to remove
-		 * @return The removed allocated slot or null.
-		 */
-		@Nullable
-		AllocatedSlot remove(final SlotRequestID slotRequestId) {
-			final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);
-
-			if (allocatedSlot != null) {
-				removeAllocatedSlot(allocatedSlot);
-			}
-
-			return allocatedSlot;
-		}
-
-		private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
-			Preconditions.checkNotNull(allocatedSlot);
-			final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
-			Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
-
-			slotsForTM.remove(allocatedSlot);
-
-			if (slotsForTM.isEmpty()) {
-				allocatedSlotsByTaskManager.remove(taskManagerId);
-			}
-		}
-
-		/**
-		 * Get all allocated slot from same TaskManager.
-		 *
-		 * @param resourceID The id of the TaskManager
-		 * @return Set of slots which are allocated from the same TaskManager
-		 */
-		Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
-			Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
-			if (slotsForTaskManager != null) {
-				for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
-					allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
-				}
-				return slotsForTaskManager;
-			}
-			else {
-				return Collections.emptySet();
-			}
-		}
-
-		void clear() {
-			allocatedSlotsById.clear();
-			allocatedSlotsByTaskManager.clear();
-		}
-
-		@VisibleForTesting
-		boolean containResource(final ResourceID resourceID) {
-			return allocatedSlotsByTaskManager.containsKey(resourceID);
-		}
-
-		@VisibleForTesting
-		int size() {
-			return allocatedSlotsById.size();
-		}
-
-		@VisibleForTesting
-		Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
-			if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
-				return allocatedSlotsByTaskManager.get(resourceId);
-			} else {
-				return Collections.emptySet();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Organize all available slots from different points of view.
-	 */
-	static class AvailableSlots {
-
-		/** All available slots organized by TaskManager */
-		private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;
-
-		/** All available slots organized by host */
-		private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost;
-
-		/** The available slots, with the time when they were inserted */
-		private final HashMap<AllocationID, SlotAndTimestamp> availableSlots;
-
-		AvailableSlots() {
-			this.availableSlotsByTaskManager = new HashMap<>();
-			this.availableSlotsByHost = new HashMap<>();
-			this.availableSlots = new HashMap<>();
-		}
-
-		/**
-		 * Adds an available slot.
-		 *
-		 * @param slot The slot to add
-		 */
-		void add(final AllocatedSlot slot, final long timestamp) {
-			checkNotNull(slot);
-
-			SlotAndTimestamp previous = availableSlots.put(
-					slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
-
-			if (previous == null) {
-				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
-				final String host = slot.getTaskManagerLocation().getFQDNHostname();
-
-				Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
-				if (slotsForTaskManager == null) {
-					slotsForTaskManager = new HashSet<>();
-					availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
-				}
-				slotsForTaskManager.add(slot);
-
-				Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-				if (slotsForHost == null) {
-					slotsForHost = new HashSet<>();
-					availableSlotsByHost.put(host, slotsForHost);
-				}
-				slotsForHost.add(slot);
-			}
-			else {
-				throw new IllegalStateException("slot already contained");
-			}
-		}
-
-		/**
-		 * Check whether we have this slot.
-		 */
-		boolean contains(AllocationID slotId) {
-			return availableSlots.containsKey(slotId);
-		}
-
-		/**
-		 * Poll a slot which matches the required resource profile. The polling tries to satisfy the
-		 * location preferences, by TaskManager and by host.
-		 *
-		 * @param resourceProfile      The required resource profile.
-		 * @param locationPreferences  The location preferences, in order to be checked.
-		 * 
-		 * @return Slot which matches the resource profile, null if we can't find a match
-		 */
-		SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> locationPreferences) {
-			// fast path if no slots are available
-			if (availableSlots.isEmpty()) {
-				return null;
-			}
-
-			boolean hadLocationPreference = false;
-
-			if (locationPreferences != null) {
-
-				// first search by TaskManager
-				for (TaskManagerLocation location : locationPreferences) {
-					hadLocationPreference = true;
-
-					final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
-					if (onTaskManager != null) {
-						for (AllocatedSlot candidate : onTaskManager) {
-							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getAllocationId());
-								return new SlotAndLocality(candidate, Locality.LOCAL);
-							}
-						}
-					}
-				}
-
-				// now, search by host
-				for (TaskManagerLocation location : locationPreferences) {
-					final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
-					if (onHost != null) {
-						for (AllocatedSlot candidate : onHost) {
-							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
-								remove(candidate.getAllocationId());
-								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
-							}
-						}
-					}
-				}
-			}
-
-			// take any slot
-			for (SlotAndTimestamp candidate : availableSlots.values()) {
-				final AllocatedSlot slot = candidate.slot();
-
-				if (slot.getResourceProfile().isMatching(resourceProfile)) {
-					remove(slot.getAllocationId());
-					return new SlotAndLocality(
-							slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
-				}
-			}
-
-			// nothing available that matches
-			return null;
-		}
-
-		/**
-		 * Remove all available slots come from specified TaskManager.
-		 *
-		 * @param taskManager The id of the TaskManager
-		 */
-		void removeAllForTaskManager(final ResourceID taskManager) {
-			// remove from the by-TaskManager view
-			final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
-
-			if (slotsForTm != null && slotsForTm.size() > 0) {
-				final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
-				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-
-				// remove from the base set and the by-host view
-				for (AllocatedSlot slot : slotsForTm) {
-					availableSlots.remove(slot.getAllocationId());
-					slotsForHost.remove(slot);
-				}
-
-				if (slotsForHost.isEmpty()) {
-					availableSlotsByHost.remove(host);
-				}
-			}
-		}
-
-		boolean tryRemove(AllocationID slotId) {
-			final SlotAndTimestamp sat = availableSlots.remove(slotId);
-			if (sat != null) {
-				final AllocatedSlot slot = sat.slot();
-				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
-				final String host = slot.getTaskManagerLocation().getFQDNHostname();
-
-				final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID);
-				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-
-				slotsForTm.remove(slot);
-				slotsForHost.remove(slot);
-
-				if (slotsForTm.isEmpty()) {
-					availableSlotsByTaskManager.remove(resourceID);
-				}
-				if (slotsForHost.isEmpty()) {
-					availableSlotsByHost.remove(host);
-				}
-
-				return true;
-			}
-			else {
-				return false;
-			}
-		}
-
-		private void remove(AllocationID slotId) throws IllegalStateException {
-			if (!tryRemove(slotId)) {
-				throw new IllegalStateException("slot not contained");
-			}
-		}
-
-		@VisibleForTesting
-		boolean containsTaskManager(ResourceID resourceID) {
-			return availableSlotsByTaskManager.containsKey(resourceID);
-		}
-
-		@VisibleForTesting
-		int size() {
-			return availableSlots.size();
-		}
-
-		@VisibleForTesting
-		void clear() {
-			availableSlots.clear();
-			availableSlotsByTaskManager.clear();
-			availableSlotsByHost.clear();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
-	 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
-	 */
-	private static class ProviderAndOwner implements SlotOwner, SlotProvider {
-
-		private final SlotPoolGateway gateway;
-
-		private final Time timeout;
-
-		ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
-			this.gateway = gateway;
-			this.timeout = timeout;
-		}
-
-		@Override
-		public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
-			gateway.returnAllocatedSlot(slot.getSlotRequestId());
-			return CompletableFuture.completedFuture(true);
-		}
-
-		@Override
-		public CompletableFuture<LogicalSlot> allocateSlot(
-				ScheduledUnit task,
-				boolean allowQueued,
-				Collection<TaskManagerLocation> preferredLocations) {
-
-			final SlotRequestID requestId = new SlotRequestID();
-			CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
-			slotFuture.whenComplete(
-				(LogicalSlot slot, Throwable failure) -> {
-					if (failure != null) {
-						gateway.cancelSlotRequest(requestId);
-					}
-			});
-			return slotFuture;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A pending request for a slot
-	 */
-	private static class PendingRequest {
-
-		private final SlotRequestID slotRequestId;
-
-		private final ResourceProfile resourceProfile;
-
-		private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
-
-		PendingRequest(
-				SlotRequestID slotRequestId,
-				ResourceProfile resourceProfile) {
-			this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-			this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
-
-			allocatedSlotFuture = new CompletableFuture<>();
-		}
-
-		public SlotRequestID getSlotRequestId() {
-			return slotRequestId;
-		}
-
-		public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
-			return allocatedSlotFuture;
-		}
-
-		public ResourceProfile getResourceProfile() {
-			return resourceProfile;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A slot, together with the timestamp when it was added
-	 */
-	private static class SlotAndTimestamp {
-
-		private final AllocatedSlot slot;
-
-		private final long timestamp;
-
-		SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
-			this.slot = slot;
-			this.timestamp = timestamp;
-		}
-
-		public AllocatedSlot slot() {
-			return slot;
-		}
-
-		public long timestamp() {
-			return timestamp;
-		}
-
-		@Override
-		public String toString() {
-			return slot + " @ " + timestamp;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
deleted file mode 100644
index 103bc61..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The gateway for calls on the {@link SlotPool}. 
- */
-public interface SlotPoolGateway extends RpcGateway {
-
-	// ------------------------------------------------------------------------
-	//  shutdown
-	// ------------------------------------------------------------------------
-
-	void suspend();
-
-	// ------------------------------------------------------------------------
-	//  resource manager connection
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Connects the SlotPool to the given ResourceManager. After this method is called, the
-	 * SlotPool will be able to request resources from the given ResourceManager.
-	 * 
-	 * @param resourceManagerGateway  The RPC gateway for the resource manager.
-	 */
-	void connectToResourceManager(ResourceManagerGateway resourceManagerGateway);
-
-	/**
-	 * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
-	 * be able to request further slots from the Resource Manager, and all currently pending requests
-	 * to the resource manager will be canceled.
-	 * 
-	 * <p>The slot pool will still be able to serve slots from its internal pool.
-	 */
-	void disconnectResourceManager();
-
-	// ------------------------------------------------------------------------
-	//  registering / un-registering TaskManagers and slots
-	// ------------------------------------------------------------------------
-
-	CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
-
-	CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
-
-	CompletableFuture<Boolean> offerSlot(
-		TaskManagerLocation taskManagerLocation,
-		TaskManagerGateway taskManagerGateway,
-		SlotOffer slotOffer);
-
-	CompletableFuture<Collection<SlotOffer>> offerSlots(
-		TaskManagerLocation taskManagerLocation,
-		TaskManagerGateway taskManagerGateway,
-		Collection<SlotOffer> offers);
-	
-	void failAllocation(AllocationID allocationID, Exception cause);
-
-	// ------------------------------------------------------------------------
-	//  allocating and disposing slots
-	// ------------------------------------------------------------------------
-
-	CompletableFuture<LogicalSlot> allocateSlot(
-			SlotRequestID requestId,
-			ScheduledUnit task,
-			ResourceProfile resources,
-			Iterable<TaskManagerLocation> locationPreferences,
-			@RpcTimeout Time timeout);
-
-	void returnAllocatedSlot(SlotRequestID slotRequestId);
-
-	/**
-	 * Cancel a slot allocation request.
-	 *
-	 * @param slotRequestId identifying the slot allocation request
-	 * @return Future acknowledge if the slot allocation has been cancelled
-	 */
-	CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
deleted file mode 100644
index 98427c2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The slot provider is responsible for preparing slots for ready-to-run tasks.
- * 
- * <p>It supports two allocating modes:
- * <ul>
- *     <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- *         {@link CompletableFuture#getNow(Object)} to get the allocated slot.</li>
- *     <li>Queued allocating: A request for a task slot is queued and returns a future that will be
- *         fulfilled as soon as a slot becomes available.</li>
- * </ul>
- */
-public interface SlotProvider {
-
-	/**
-	 * Allocating slot with specific requirement.
-	 *
-	 * @param task         The task to allocate the slot for
-	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
-	 * @param preferredLocations preferred locations for the slot allocation
-	 * @return The future of the allocation
-	 */
-	CompletableFuture<LogicalSlot> allocateSlot(
-		ScheduledUnit task,
-		boolean allowQueued,
-		Collection<TaskManagerLocation> preferredLocations);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
deleted file mode 100644
index 8e19944..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.util.AbstractID;
-
-/**
- * Request ID identifying different slot requests.
- */
-public final class SlotRequestID extends AbstractID {
-    private static final long serialVersionUID = -6072105912250154283L;
-
-    public SlotRequestID(long lowerPart, long upperPart) {
-        super(lowerPart, upperPart);
-    }
-
-    public SlotRequestID() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 45b4a96..289762c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -18,28 +18,29 @@
 
 package org.apache.flink.runtime.instance;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 
 /**
  * The SlotSharingGroupAssignment manages a set of shared slots, which are shared between
@@ -215,7 +216,7 @@ public class SlotSharingGroupAssignment {
 						// note that this does implicitly release the slot we have just added
 						// as well, because we release its last child slot. That is expected
 						// and desired.
-						constraintGroupSlot.releaseInstanceSlot();
+						constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot in this shared slot."));
 					}
 				}
 				else {
@@ -273,7 +274,7 @@ public class SlotSharingGroupAssignment {
 	 */
 	public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
-			Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
+			Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(vertexID, locationPreferences, false);
 
 			if (p != null) {
 				SharedSlot ss = p.f0;
@@ -324,7 +325,7 @@ public class SlotSharingGroupAssignment {
 				}
 
 				TaskManagerLocation location = previous.getTaskManagerLocation();
-				Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(
+				Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(
 						constraint.getGroupId(), Collections.singleton(location), true);
 
 				if (p == null) {
@@ -355,7 +356,7 @@ public class SlotSharingGroupAssignment {
 				// grab a new slot and initialize the constraint with that one.
 				// preferred locations are defined by the vertex
 				Tuple2<SharedSlot, Locality> p =
-						getSlotForTaskInternal(constraint.getGroupId(), locationPreferences, false);
+						getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false);
 				if (p == null) {
 					// could not get a shared slot for this co-location-group
 					return null;
@@ -382,9 +383,10 @@ public class SlotSharingGroupAssignment {
 	}
 
 
-	private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(
-			AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly)
-	{
+	public Tuple2<SharedSlot, Locality> getSharedSlotForTask(
+			AbstractID groupId,
+			Iterable<TaskManagerLocation> preferredLocations,
+			boolean localOnly) {
 		// check if there is anything at all in this group assignment
 		if (allSlots.isEmpty()) {
 			return null;
@@ -507,7 +509,7 @@ public class SlotSharingGroupAssignment {
 	}
 
 	/**
-	 * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseInstanceSlot()}.
+	 * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}.
 	 * 
 	 * @param sharedSlot The slot to be released.
 	 */
@@ -517,10 +519,11 @@ public class SlotSharingGroupAssignment {
 				// we are releasing this slot
 				
 				if (sharedSlot.hasChildren()) {
+					final FlinkException cause = new FlinkException("Releasing shared slot parent.");
 					// by simply releasing all children, we should eventually release this slot.
 					Set<Slot> children = sharedSlot.getSubSlots();
 					while (children.size() > 0) {
-						children.iterator().next().releaseInstanceSlot();
+						children.iterator().next().releaseSlot(cause);
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
new file mode 100644
index 0000000..e5d4467
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.util.AbstractID;
+
+public class SlotSharingGroupId extends AbstractID {
+	private static final long serialVersionUID = 8837647978345422042L;
+
+	public SlotSharingGroupId(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public SlotSharingGroupId() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index ffc1a7c..baa452f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.instance.Instance;
-
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.runtime.instance.SharedSlot;
 
-import static org.apache.flink.util.Preconditions.checkState;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A CoLocationConstraint manages the location of a set of tasks
@@ -43,12 +47,14 @@ public class CoLocationConstraint {
 
 	private volatile SharedSlot sharedSlot;
 
-	private volatile ResourceID lockedLocation;
+	private volatile TaskManagerLocation lockedLocation;
 
+	private volatile SlotRequestId slotRequestId;
 
 	CoLocationConstraint(CoLocationGroup group) {
 		Preconditions.checkNotNull(group);
 		this.group = group;
+		this.slotRequestId = null;
 	}
 
 	// ------------------------------------------------------------------------
@@ -107,7 +113,7 @@ public class CoLocationConstraint {
 	 */
 	public TaskManagerLocation getLocation() {
 		if (lockedLocation != null) {
-			return sharedSlot.getTaskManagerLocation();
+			return lockedLocation;
 		} else {
 			throw new IllegalStateException("Location not yet locked");
 		}
@@ -136,12 +142,12 @@ public class CoLocationConstraint {
 			this.sharedSlot = newSlot;
 		}
 		else if (newSlot != this.sharedSlot){
-			if (lockedLocation != null && lockedLocation != newSlot.getTaskManagerID()) {
+			if (lockedLocation != null && !Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) {
 				throw new IllegalArgumentException(
 						"Cannot assign different location to a constraint whose location is locked.");
 			}
 			if (this.sharedSlot.isAlive()) {
-				this.sharedSlot.releaseInstanceSlot();
+				this.sharedSlot.releaseSlot(new FlinkException("Setting new shared slot for co-location constraint."));
 			}
 
 			this.sharedSlot = newSlot;
@@ -159,7 +165,43 @@ public class CoLocationConstraint {
 		checkState(lockedLocation == null, "Location is already locked");
 		checkState(sharedSlot != null, "Cannot lock location without a slot.");
 
-		lockedLocation = sharedSlot.getTaskManagerID();
+		lockedLocation = sharedSlot.getTaskManagerLocation();
+	}
+
+	/**
+	 * Locks the location of this slot. The location can be locked only once
+	 * and only after a shared slot has been assigned.
+	 *
+	 * <p>Note: This method exists for compatibility reasons with the Flip-6 SlotPool
+	 *
+	 * @param taskManagerLocation to lock this co-location constraint to
+	 */
+	public void lockLocation(TaskManagerLocation taskManagerLocation) {
+		checkNotNull(taskManagerLocation);
+		checkState(lockedLocation == null, "Location is already locked.");
+
+		lockedLocation = taskManagerLocation;
+	}
+
+	/**
+	 * Sets the slot request id of the currently assigned slot to the co-location constraint.
+	 * All other tasks belonging to this co-location constraint will be deployed to the same slot.
+	 *
+	 * @param slotRequestId identifying the assigned slot for this co-location constraint
+	 */
+	public void setSlotRequestId(@Nullable SlotRequestId slotRequestId) {
+		this.slotRequestId = slotRequestId;
+	}
+
+	/**
+	 * Returns the currently assigned slot request id identifying the slot to which tasks
+	 * belonging to this co-location constraint will be deployed to.
+	 *
+	 * @return Slot request id of the assigned slot or null if none
+	 */
+	@Nullable
+	public SlotRequestId getSlotRequestId() {
+		return slotRequestId;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 546f31f..e1c1657 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -44,8 +44,8 @@ public class NoResourceAvailableException extends JobException {
 	NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) {
 		super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
 				BASE_MESSAGE, task.getTaskToExecute(),
-				task.getLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getLocationConstraint().getGroupId(),
-				task.getSlotSharingGroup(),
+				task.getCoLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getCoLocationConstraint().getGroupId(),
+				task.getSlotSharingGroupId(),
 				numInstances,
 				numSlotsTotal,
 				availableSlots));

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 7348c9d..903872b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -19,68 +19,108 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+/**
+ * ScheduledUnit contains the information necessary to allocate a slot for the given
+ * {@link JobVertexID}.
+ */
 public class ScheduledUnit {
-	
+
+	@Nullable
 	private final Execution vertexExecution;
-	
-	private final SlotSharingGroup sharingGroup;
-	
-	private final CoLocationConstraint locationConstraint;
+
+	private final JobVertexID jobVertexId;
+
+	@Nullable
+	private final SlotSharingGroupId slotSharingGroupId;
+
+	@Nullable
+	private final CoLocationConstraint coLocationConstraint;
 	
 	// --------------------------------------------------------------------------------------------
 	
 	public ScheduledUnit(Execution task) {
-		Preconditions.checkNotNull(task);
-		
-		this.vertexExecution = task;
-		this.sharingGroup = null;
-		this.locationConstraint = null;
+		this(
+			Preconditions.checkNotNull(task),
+			task.getVertex().getJobvertexId(),
+			null,
+			null);
 	}
 	
-	public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
-		Preconditions.checkNotNull(task);
-		
-		this.vertexExecution = task;
-		this.sharingGroup = sharingUnit;
-		this.locationConstraint = null;
+	public ScheduledUnit(Execution task, @Nullable SlotSharingGroupId slotSharingGroupId) {
+		this(
+			Preconditions.checkNotNull(task),
+			task.getVertex().getJobvertexId(),
+			slotSharingGroupId,
+			null);
 	}
 	
-	public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, CoLocationConstraint locationConstraint) {
-		Preconditions.checkNotNull(task);
-		Preconditions.checkNotNull(sharingUnit);
-		Preconditions.checkNotNull(locationConstraint);
-		
+	public ScheduledUnit(
+			Execution task,
+			@Nullable SlotSharingGroupId slotSharingGroupId,
+			@Nullable CoLocationConstraint coLocationConstraint) {
+		this(
+			Preconditions.checkNotNull(task),
+			task.getVertex().getJobvertexId(),
+			slotSharingGroupId,
+			coLocationConstraint);
+	}
+
+	public ScheduledUnit(
+			JobVertexID jobVertexId,
+			@Nullable SlotSharingGroupId slotSharingGroupId,
+			@Nullable CoLocationConstraint coLocationConstraint) {
+		this(
+			null,
+			jobVertexId,
+			slotSharingGroupId,
+			coLocationConstraint);
+	}
+
+	public ScheduledUnit(
+		@Nullable Execution task,
+		JobVertexID jobVertexId,
+		@Nullable SlotSharingGroupId slotSharingGroupId,
+		@Nullable CoLocationConstraint coLocationConstraint) {
+
 		this.vertexExecution = task;
-		this.sharingGroup = sharingUnit;
-		this.locationConstraint = locationConstraint;
+		this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+		this.slotSharingGroupId = slotSharingGroupId;
+		this.coLocationConstraint = coLocationConstraint;
+
 	}
 
 	// --------------------------------------------------------------------------------------------
 	
 	public JobVertexID getJobVertexId() {
-		return this.vertexExecution.getVertex().getJobvertexId();
+		return jobVertexId;
 	}
-	
+
+	@Nullable
 	public Execution getTaskToExecute() {
 		return vertexExecution;
 	}
-	
-	public SlotSharingGroup getSlotSharingGroup() {
-		return sharingGroup;
+
+	@Nullable
+	public SlotSharingGroupId getSlotSharingGroupId() {
+		return slotSharingGroupId;
 	}
-	
-	public CoLocationConstraint getLocationConstraint() {
-		return locationConstraint;
+
+	@Nullable
+	public CoLocationConstraint getCoLocationConstraint() {
+		return coLocationConstraint;
 	}
 
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
-		return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup + 
-				", locationConstraint=" + locationConstraint + '}';
+		return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + slotSharingGroupId +
+				", locationConstraint=" + coLocationConstraint + '}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3c38e0..40fb760 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SharedSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -39,6 +41,8 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -49,6 +53,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -177,7 +182,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 	
 		synchronized (globalLock) {
 			
-			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+			SlotSharingGroup sharingUnit = vertex.getJobVertex().getSlotSharingGroup();
 			
 			if (sharingUnit != null) {
 
@@ -189,7 +194,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				}
 				
 				final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
-				final CoLocationConstraint constraint = task.getLocationConstraint();
+				final CoLocationConstraint constraint = task.getCoLocationConstraint();
 				
 				// sanity check that we do not use an externally forced location and a co-location constraint together
 				if (constraint != null && forceExternalLocation) {
@@ -274,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						// if there is no slot from the group, or the new slot is local,
 						// then we use the new slot
 						if (slotFromGroup != null) {
-							slotFromGroup.releaseInstanceSlot();
+							slotFromGroup.releaseSlot(null);
 						}
 						toUse = newSlot;
 					}
@@ -282,7 +287,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						// both are available and usable. neither is local. in that case, we may
 						// as well use the slot from the sharing group, to minimize the number of
 						// instances that the job occupies
-						newSlot.releaseInstanceSlot();
+						newSlot.releaseSlot(null);
 						toUse = slotFromGroup;
 					}
 
@@ -299,10 +304,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				}
 				catch (Throwable t) {
 					if (slotFromGroup != null) {
-						slotFromGroup.releaseInstanceSlot();
+						slotFromGroup.releaseSlot(t);
 					}
 					if (newSlot != null) {
-						newSlot.releaseInstanceSlot();
+						newSlot.releaseSlot(t);
 					}
 
 					ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
@@ -444,7 +449,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 					}
 					else {
 						// could not add and allocate the sub-slot, so release shared slot
-						sharedSlot.releaseInstanceSlot();
+						sharedSlot.releaseSlot(new FlinkException("Could not allocate sub-slot."));
 					}
 				}
 			}
@@ -854,4 +859,19 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			return future;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Testing methods
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	@Nullable
+	public Instance getInstance(ResourceID resourceId) {
+		for (Instance instance : allInstances) {
+			if (Objects.equals(resourceId, instance.getTaskManagerID())) {
+				return instance;
+			}
+		}
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index 0fa1362..86be9d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 /**
@@ -39,7 +40,8 @@ public class SlotSharingGroup implements java.io.Serializable {
 	
 	/** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */
 	private transient SlotSharingGroupAssignment taskAssignment;
-	
+
+	private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
 	
 	public SlotSharingGroup() {}
 	
@@ -62,8 +64,11 @@ public class SlotSharingGroup implements java.io.Serializable {
 	public Set<JobVertexID> getJobVertexIds() {
 		return Collections.unmodifiableSet(ids);
 	}
-	
-	
+
+	public SlotSharingGroupId getSlotSharingGroupId() {
+		return slotSharingGroupId;
+	}
+
 	public SlotSharingGroupAssignment getTaskAssignment() {
 		if (this.taskAssignment == null) {
 			this.taskAssignment = new SlotSharingGroupAssignment();

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
deleted file mode 100644
index a5b75d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotRequestID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Simple implementation of the {@link SlotContext} interface for the legacy code.
- */
-public class SimpleSlotContext implements SlotContext {
-
-	private final SlotRequestID slotRequestId;
-
-	private final AllocationID allocationId;
-
-	private final TaskManagerLocation taskManagerLocation;
-
-	private final int physicalSlotNumber;
-
-	private final TaskManagerGateway taskManagerGateway;
-
-	public SimpleSlotContext(
-			SlotRequestID slotRequestId,
-			AllocationID allocationId,
-			TaskManagerLocation taskManagerLocation,
-			int physicalSlotNumber,
-			TaskManagerGateway taskManagerGateway) {
-		this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
-		this.allocationId = Preconditions.checkNotNull(allocationId);
-		this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
-		this.physicalSlotNumber = physicalSlotNumber;
-		this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
-	}
-
-	@Override
-	public SlotRequestID getSlotRequestId() {
-		return slotRequestId;
-	}
-
-	@Override
-	public AllocationID getAllocationId() {
-		return allocationId;
-	}
-
-	@Override
-	public TaskManagerLocation getTaskManagerLocation() {
-		return taskManagerLocation;
-	}
-
-	@Override
-	public int getPhysicalSlotNumber() {
-		return physicalSlotNumber;
-	}
-
-	@Override
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
index 5ae057d..85871c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager.slots;
 
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,11 +39,11 @@ public class SlotAndLocality {
 
 	// ------------------------------------------------------------------------
 
-	public AllocatedSlot slot() {
+	public AllocatedSlot getSlot() {
 		return slot;
 	}
 
-	public Locality locality() {
+	public Locality getLocality() {
 		return locality;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
deleted file mode 100644
index 1e0317a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotRequestID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-/**
- * Interface for the context of a logical {@link Slot}. This context contains information
- * about the underlying allocated slot and how to communicate with the TaskManager on which
- * it was allocated.
- */
-public interface SlotContext {
-
-	/**
-	 * Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot.
-	 *
-	 * @return The id under which the slot has been requested
-	 */
-	SlotRequestID getSlotRequestId();
-
-	/**
-	 * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
-	 * physical slot.
-	 *
-	 * @return The id under whic teh slot has been allocated on the TaskManager
-	 */
-	AllocationID getAllocationId();
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	TaskManagerLocation getTaskManagerLocation();
-
-	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	int getPhysicalSlotNumber();
-
-	/**
-	 * Gets the actor gateway that can be used to send messages to the TaskManager.
-	 * <p>
-	 * This method should be removed once the new interface-based RPC abstraction is in place
-	 *
-	 * @return The gateway that can be used to send messages to the TaskManager.
-	 */
-	TaskManagerGateway getTaskManagerGateway();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
deleted file mode 100644
index bc1ced4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.instance.LogicalSlot;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Interface for components that hold slots and to which slots get released / recycled.
- */
-public interface SlotOwner {
-
-	/**
-	 * Return the given slot to the slot owner.
-	 *
-	 * @param logicalSlot to return
-	 * @return Future which is completed with true if the slot could be returned, otherwise with false
-	 */
-	CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 324557f..7a2844d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,9 +55,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.LogicalSlot;
-import org.apache.flink.runtime.instance.SlotPool;
-import org.apache.flink.runtime.instance.SlotPoolGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;