You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:13 UTC

[06/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Introduce a hardened slot manager

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f055971..31edbf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -21,519 +21,893 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there
+ * are not enough slots available the slot manager will notify the resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests time out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
+
+	/** Scheduled executor for timeouts */
+	private final ScheduledExecutor scheduledExecutor;
+
+	/** Timeout for slot requests to the task manager */
+	private final Time taskManagerRequestTimeout;
+
+	/** Timeout after which an allocation is discarded */
+	private final Time slotRequestTimeout;
+
+	/** Timeout after which an unused TaskManager is released */
+	private final Time taskManagerTimeout;
+
+	/** Map for all registered slots */
+	private final HashMap<SlotID, TaskManagerSlot> slots;
 
-	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+	/** Index of all currently free slots */
+	private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
 
-	/** The Resource allocation provider */
-	protected final ResourceManagerServices rmServices;
+	/** All currently registered task managers */
+	private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
 
-	/** All registered task managers with ResourceID and gateway. */
-	private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+	/** Map of fulfilled and active allocations for request deduplication purposes */
+	private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
 
-	/** All registered slots, including free and allocated slots */
-	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+	/** Map of pending/unfulfilled slot allocation requests */
+	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
 
-	/** All pending slot requests, waiting available slots to fulfil */
-	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+	/** Leader id of the containing component */
+	private UUID leaderId;
 
-	/** All free slots that can be used to be allocated */
-	private final Map<SlotID, ResourceSlot> freeSlots;
+	/** Executor for future callbacks which have to be "synchronized" */
+	private Executor mainThreadExecutor;
 
-	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
-	private final AllocationMap allocationMap;
+	/** Callbacks for resource (de-)allocations */
+	private ResourceManagerActions resourceManagerActions;
 
-	private final Time timeout;
+	/** True iff the component has been started */
+	private boolean started;
 
-	public SlotManager(ResourceManagerServices rmServices) {
-		this.rmServices = checkNotNull(rmServices);
-		this.registeredSlots = new HashMap<>(16);
-		this.pendingSlotRequests = new LinkedHashMap<>(16);
-		this.freeSlots = new HashMap<>(16);
-		this.allocationMap = new AllocationMap();
-		this.taskManagers = new HashMap<>();
-		this.timeout = Time.seconds(10);
+	public SlotManager(
+			ScheduledExecutor scheduledExecutor,
+			Time taskManagerRequestTimeout,
+			Time slotRequestTimeout,
+			Time taskManagerTimeout) {
+		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
+		this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
+		this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
+		this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
+
+		slots = new HashMap<>(16);
+		freeSlots = new LinkedHashMap<>(16);
+		taskManagerRegistrations = new HashMap<>(4);
+		fulfilledSlotRequests = new HashMap<>(16);
+		pendingSlotRequests = new HashMap<>(16);
+
+		leaderId = null;
+		resourceManagerActions = null;
+		started = false;
 	}
 
-	// ------------------------------------------------------------------------
-	//  slot managements
-	// ------------------------------------------------------------------------
+	// ---------------------------------------------------------------------------------------------
+	// Component lifecycle methods
+	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
-	 * allocation if we don't have enough resource. If we have free slot which can match the request, record
-	 * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
-	 * RPC's main thread to avoid race condition).
+	 * Starts the slot manager with the given leader id and resource manager actions.
 	 *
-	 * @param request The detailed request of the slot
-	 * @return RMSlotRequestRegistered The confirmation message to be send to the caller
+	 * @param newLeaderId to use for communication with the task managers
+	 * @param newResourceManagerActions to use for resource (de-)allocations
+	 */
+	public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
+		leaderId = Preconditions.checkNotNull(newLeaderId);
+		mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
+		resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
+
+		started = true;
+	}
+
+	/**
+	 * Suspends the component. This clears the internal state of the slot manager.
 	 */
-	public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
-		final AllocationID allocationId = request.getAllocationId();
-		if (isRequestDuplicated(request)) {
-			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
-			return new RMSlotRequestRegistered(allocationId);
+	public void suspend() {
+		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
+			cancelPendingSlotRequest(pendingSlotRequest);
 		}
 
-		// try to fulfil the request with current free slots
-		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-		if (slot != null) {
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
-				allocationId, request.getJobId());
+		pendingSlotRequests.clear();
+
+		ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());
+
+		for (InstanceID registeredTaskManager : registeredTaskManagers) {
+			unregisterTaskManager(registeredTaskManager);
+		}
 
-			// record this allocation in bookkeeping
-			allocationMap.addAllocation(slot.getSlotId(), allocationId);
-			// remove selected slot from free pool
-			freeSlots.remove(slot.getSlotId());
+		leaderId = null;
+		resourceManagerActions = null;
+		started = false;
+	}
+
+	/**
+	 * Closes the slot manager.
+	 *
+	 * @throws Exception if the close operation fails
+	 */
+	@Override
+	public void close() throws Exception {
+		suspend();
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Public API
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Requests a slot with the respective resource profile.
+	 *
+	 * @param slotRequest specifying the requested slot specs
+	 * @return true if the slot request was registered; false if the request is a duplicate
+	 * @throws SlotManagerException if the slot request failed (e.g. not enough resources left)
+	 */
+	public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
+		checkInit();
 
-			sendSlotRequest(slot, request);
+		if (checkDuplicateRequest(slotRequest.getAllocationId())) {
+			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
+
+			return false;
 		} else {
-			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
-				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
-			Preconditions.checkState(rmServices != null,
-				"Attempted to allocate resources but no ResourceManagerServices set.");
-			rmServices.allocateResource(request.getResourceProfile());
-			pendingSlotRequests.put(allocationId, request);
+			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
+
+			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
+
+			try {
+				internalRequestSlot(pendingSlotRequest);
+			} catch (ResourceManagerException e) {
+				// requesting the slot failed --> remove pending slot request
+				pendingSlotRequests.remove(slotRequest.getAllocationId());
+
+				throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+			}
+
+			return true;
 		}
+	}
+
+	/**
+	 * Cancels and removes a pending slot request with the given allocation id. If there is no such
+	 * pending request, then nothing is done.
+	 *
+	 * @param allocationId identifying the pending slot request
+	 * @return True if a pending slot request was found; otherwise false
+	 */
+	public boolean unregisterSlotRequest(AllocationID allocationId) {
+		checkInit();
+
+		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+		if (null != pendingSlotRequest) {
+			cancelPendingSlotRequest(pendingSlotRequest);
+
+			return true;
+		} else {
+			LOG.debug("No pending slot request with allocation id {} found.", allocationId);
 
-		return new RMSlotRequestRegistered(allocationId);
+			return false;
+		}
 	}
 
 	/**
-	 * Notifies the SlotManager that a slot is available again after being allocated.
-	 * @param slotID slot id of available slot
+	 * Registers a new task manager at the slot manager. This will make the task managers slots
+	 * known and, thus, available for allocation.
+	 *
+	 * @param taskExecutorConnection for the new task manager
+	 * @param initialSlotReport for the new task manager
 	 */
-	public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
-		if (!allocationMap.isAllocated(slotID)) {
-			throw new IllegalStateException("Slot was not previously allocated but " +
-				"TaskManager reports it as available again");
+	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+		checkInit();
+
+		// we identify task managers by their instance id
+		if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
+			TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection);
+			taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
 		}
-		allocationMap.removeAllocation(slotID);
-		final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
-		ResourceSlot freeSlot = slots.get(slotID);
-		if (freeSlot == null) {
-			throw new IllegalStateException("Slot was not registered with SlotManager but " +
-				"TaskManager reported it to be available.");
+
+		reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
+	}
+
+	/**
+	 * Unregisters the task manager identified by the given instance id and its associated slots
+	 * from the slot manager.
+	 *
+	 * @param instanceId identifying the task manager to unregister
+	 * @return True if there existed a registered task manager with the given instance id
+	 */
+	public boolean unregisterTaskManager(InstanceID instanceId) {
+		checkInit();
+
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
+
+		if (null != taskManagerRegistration) {
+			removeSlots(taskManagerRegistration.getSlots());
+
+			taskManagerRegistration.cancelTimeout();
+
+			return true;
+		} else {
+			LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId);
+
+			return false;
 		}
-		handleFreeSlot(freeSlot);
 	}
 
 	/**
-	 * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
-	 * or really rejected by TaskManager. We shall retry this request by:
-	 * <ul>
-	 * <li>1. verify and clear all the previous allocate information for this request
-	 * <li>2. try to request slot again
-	 * </ul>
-	 * <p>
-	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
-	 * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
-	 * but it can be taken care of by rejecting registration at JobManager.
+	 * Reports the current slot allocations for a task manager identified by the given instance id.
 	 *
-	 * @param originalRequest The original slot request
-	 * @param slotId          The target SlotID
+	 * @param instanceId identifying the task manager for which to report the slot status
+	 * @param slotReport containing the status for all of its slots
+	 * @return true if the slot status has been updated successfully, otherwise false
 	 */
-	void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
-		final AllocationID originalAllocationId = originalRequest.getAllocationId();
-		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
-			slotId, originalAllocationId, originalRequest.getJobId());
-
-		if (allocationMap.isAllocated(slotId)) {
-			final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId);
-
-			// check whether we have an agreement on whom this slot belongs to
-			if (originalAllocationId.equals(expectedAllocationId)) {
-				LOG.info("De-allocate this request and retry");
-				allocationMap.removeAllocation(expectedAllocationId);
-				pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
-				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
-				// treat this slot as empty and retry with a different request
-				handleFreeSlot(slot);
-			} else {
-				LOG.error("Slot request failed for slot {} with allocation id {}:" +
-						" Allocation id did not match the expected allocation id {}.",
-					slotId, originalAllocationId, expectedAllocationId);
+	public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
+		checkInit();
+
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
+
+		if (null != taskManagerRegistration) {
+			ArrayList<SlotID> slotsToRemove = new ArrayList<>(taskManagerRegistration.getSlots());
+			boolean idle = true;
+
+			for (SlotStatus slotStatus : slotReport) {
+				if (slotsToRemove.remove(slotStatus.getSlotID())) {
+					// slot which was already registered
+					updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+				} else {
+					// new slot
+					registerSlot(
+						taskManagerRegistration,
+						slotStatus.getSlotID(),
+						slotStatus.getAllocationID(),
+						slotStatus.getResourceProfile(),
+						taskManagerRegistration.getTaskManagerConnection());
+				}
+
+				TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
+
+				idle &= slot.isFree();
+			}
+
+			// remove the slots for which we haven't received a slot status message
+			removeSlots(slotsToRemove);
+
+			if (idle) {
+				// no slot of this task manager is being used --> register timer to free this resource
+				registerTaskManagerTimeout(taskManagerRegistration);
 			}
+
+			return true;
 		} else {
-			LOG.error("Slot request failed for slot {} with allocation id {}: " +
-					"Slot was not previously registered.",
-				slotId, originalAllocationId);
+			LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceId);
+
+			return false;
 		}
 	}
 
 	/**
-	 * Registers a TaskExecutor
-	 * @param resourceID TaskExecutor's ResourceID
-	 * @param registration TaskExecutor's registration
-	 * @param slotReport TaskExecutor's free and allocated slots
+	 * Free the given slot from the given allocation. If the slot is still allocated by the given
+	 * allocation id, then the slot will be marked as free and will be subject to new slot requests.
+	 *
+	 * @param slotId identifying the slot to free
+	 * @param allocationId with which the slot is presumably allocated
 	 */
-	public void registerTaskExecutor(
-			ResourceID resourceID,
-			TaskExecutorRegistration registration,
-			SlotReport slotReport) {
+	public void freeSlot(SlotID slotId, AllocationID allocationId) {
+		checkInit();
 
-		if (taskManagers.get(resourceID) != null) {
-			notifyTaskManagerFailure(resourceID);
-		}
+		TaskManagerSlot slot = slots.get(slotId);
+
+		if (null != slot) {
+			if (slot.isAllocated()) {
+				if (Objects.equals(allocationId, slot.getAllocationId())) {
+					// free the slot
+					slot.setAllocationId(null);
+					fulfilledSlotRequests.remove(allocationId);
 
-		this.taskManagers.put(resourceID, registration);
+					if (slot.isFree()) {
+						handleFreeSlot(slot);
+					}
 
-		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-			final SlotID slotId = slotStatus.getSlotID();
+					TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
-			final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID());
-			if (taskExecutorRegistration == null) {
-				LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
-					slotId.getResourceID());
-				return;
+					if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
+						registerTaskManagerTimeout(taskManagerRegistration);
+					}
+				} else {
+					LOG.debug("Received request to free slot {} with expected allocation id {}, " +
+						"but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
+				}
+			} else {
+				LOG.debug("Slot {} has not been allocated.", allocationId);
 			}
+		} else {
+			LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotId);
+		}
+	}
 
-			final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration);
+	// ---------------------------------------------------------------------------------------------
+	// Behaviour methods
+	// ---------------------------------------------------------------------------------------------
 
-			registerNewSlot(slot);
-			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID());
+	/**
+	 * Finds a matching slot request for a given resource profile. If there is no such request,
+	 * the method returns null.
+	 *
+	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+	 * request fulfillment, then you should override this method.
+	 *
+	 * @param slotResourceProfile defining the resources of an available slot
+	 * @return A matching slot request which can be deployed in a slot with the given resource
+	 * profile. Null if there is no such slot request pending.
+	 */
+	protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) {
 
-			if (slotStatus.getAllocationID() != null) {
-				// slot in use, record this in bookkeeping
-				allocationMap.addAllocation(slotId, slotStatus.getAllocationID());
-			} else {
-				handleFreeSlot(slot);
+		for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
+			if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) {
+				return pendingSlotRequest;
 			}
 		}
+
+		return null;
 	}
 
 	/**
-	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+	 * Finds a matching slot for a given resource profile. A matching slot has at least as many
+	 * resources available as the given resource profile. If there is no such slot available, then
+	 * the method returns null.
 	 *
-	 * @param resourceId The ResourceID of the TaskManager
+	 * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+	 * request fulfillment, then you should override this method.
+	 *
+	 * @param requestResourceProfile specifying the resource requirements for the a slot request
+	 * @return A matching slot which fulfills the given resource profile. Null if there is no such
+	 * slot available.
 	 */
-	public void notifyTaskManagerFailure(final ResourceID resourceId) {
-		LOG.info("Resource:{} been notified failure", resourceId);
-		taskManagers.remove(resourceId);
-		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
-		if (slotIdsToRemove != null) {
-			for (SlotID slotId : slotIdsToRemove.keySet()) {
-				LOG.info("Removing Slot: {} upon resource failure", slotId);
-				if (freeSlots.containsKey(slotId)) {
-					freeSlots.remove(slotId);
-				} else if (allocationMap.isAllocated(slotId)) {
-					allocationMap.removeAllocation(slotId);
-				} else {
-					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
-				}
+	protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) {
+		Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator();
+
+		while (iterator.hasNext()) {
+			TaskManagerSlot taskManagerSlot = iterator.next().getValue();
+
+			// sanity check
+			Preconditions.checkState(taskManagerSlot.isFree());
+
+			if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
+				iterator.remove();
+				return taskManagerSlot;
 			}
 		}
+
+		return null;
 	}
 
-	// ------------------------------------------------------------------------
-	//  internal behaviors
-	// ------------------------------------------------------------------------
+	// ---------------------------------------------------------------------------------------------
+	// Internal slot operations
+	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
-	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
-	 * to the free pool.
+	 * Registers a slot for the given task manager at the slot manager. The slot is identified by
+	 * the given slot id. The given resource profile defines the available resources for the slot.
+	 * The task manager connection can be used to communicate with the task manager.
 	 *
-	 * @param freeSlot The free slot
+	 * @param taskManagerRegistration Task manager for which to register the given slot
+	 * @param slotId identifying the slot on the task manager
+	 * @param allocationId which is currently deployed in the slot
+	 * @param resourceProfile of the slot
+	 * @param taskManagerConnection to communicate with the remote task manager
 	 */
-	private void handleFreeSlot(final ResourceSlot freeSlot) {
-		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+	private void registerSlot(
+			TaskManagerRegistration taskManagerRegistration,
+			SlotID slotId,
+			AllocationID allocationId,
+			ResourceProfile resourceProfile,
+			TaskExecutorConnection taskManagerConnection) {
+
+		Preconditions.checkNotNull(taskManagerRegistration);
+
+		TaskManagerSlot slot = new TaskManagerSlot(
+			slotId,
+			resourceProfile,
+			taskManagerConnection,
+			allocationId);
+
+		slots.put(slotId, slot);
+
+		taskManagerRegistration.addSlot(slotId);
+
+		if (slot.isFree()) {
+			handleFreeSlot(slot);
+		}
 
-		if (chosenRequest != null) {
-			final AllocationID allocationId = chosenRequest.getAllocationId();
-			final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId);
+		if (slot.isAllocated()) {
+			fulfilledSlotRequests.put(slot.getAllocationId(), slotId);
+		}
+	}
+
+	/**
+	 * Updates a slot with the given allocation id.
+	 *
+	 * @param slotId to update
+	 * @param allocationId specifying the current allocation of the slot
+	 */
+	private void updateSlot(SlotID slotId, AllocationID allocationId) {
+		TaskManagerSlot slot = slots.get(slotId);
+
+		if (null != slot) {
+			// we assume the given allocation id to be the ground truth (coming from the TM)
+			slot.setAllocationId(allocationId);
+
+			if (null != allocationId) {
+				if (slot.hasPendingSlotRequest()){
+					// we have a pending slot request --> check whether we have to reject it
+					PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();
+
+					if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
+						// we can cancel the slot request because it has been fulfilled
+						cancelPendingSlotRequest(pendingSlotRequest);
+
+						// remove the pending slot request, since it has been completed
+						pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
+					} else {
+						// this will try to find a new slot for the request
+						rejectPendingSlotRequest(
+							pendingSlotRequest,
+							new Exception("Task manager reported slot " + slotId + " being already allocated."));
+					}
+
+					slot.setAssignedSlotRequest(null);
+				}
 
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
-				allocationId, chosenRequest.getJobId());
-			allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
+				fulfilledSlotRequests.put(allocationId, slotId);
 
-			sendSlotRequest(freeSlot, slotRequest);
+				TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+				if (null != taskManagerRegistration) {
+					// disable any registered time out for the task manager
+					taskManagerRegistration.cancelTimeout();
+				}
+			}
 		} else {
-			freeSlots.put(freeSlot.getSlotId(), freeSlot);
+			LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
 		}
 	}
 
-	private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slotRequest) {
+	/**
+	 * Tries to allocate a slot for the given slot request. If there is no slot available, the
+	 * resource manager is informed to allocate more resources and a timeout for the request is
+	 * registered.
+	 *
+	 * @param pendingSlotRequest to allocate a slot for
+	 * @throws ResourceManagerException if the resource manager cannot allocate more resource
+	 */
+	private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
+		TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile());
 
-		final AllocationID allocationID = slotRequest.getAllocationId();
-		final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
-		final Future<TMSlotRequestReply> slotRequestReplyFuture =
-			registration.getTaskExecutorGateway()
-				.requestSlot(
-					freeSlot.getSlotId(),
-					slotRequest.getJobId(),
-					allocationID,
-					"foobar", // TODO: set proper JM address
-					rmServices.getLeaderID(),
-					timeout);
+		if (taskManagerSlot != null) {
+			allocateSlot(taskManagerSlot, pendingSlotRequest);
+		} else {
+			final UUID timeoutIdentifier = UUID.randomUUID();
+			final AllocationID allocationId = pendingSlotRequest.getAllocationId();
+
+			// register timeout for slot request
+			ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
+				@Override
+				public void run() {
+					mainThreadExecutor.execute(new Runnable() {
+						@Override
+						public void run() {
+							timeoutSlotRequest(allocationId, timeoutIdentifier);
+						}
+					});
+				}
+			}, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier);
 
-		slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() {
+			resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
+		}
+	}
+
+	/**
+	 * Allocates the given slot for the given slot request. This entails sending a registration
+	 * message to the task manager and treating failures.
+	 *
+	 * @param taskManagerSlot to allocate for the given slot request
+	 * @param pendingSlotRequest to allocate the given slot for
+	 */
+	private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
+		TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
+		TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
+
+		final CompletableFuture<Acknowledge> completableFuture = new FlinkCompletableFuture<>();
+		final AllocationID allocationId = pendingSlotRequest.getAllocationId();
+		final SlotID slotId = taskManagerSlot.getSlotId();
+
+		taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
+		pendingSlotRequest.setRequestFuture(completableFuture);
+
+		// RPC call to the task manager
+		Future<Acknowledge> requestFuture = gateway.requestSlot(
+			slotId,
+			pendingSlotRequest.getJobId(),
+			allocationId,
+			pendingSlotRequest.getTargetAddress(),
+			leaderId,
+			taskManagerRequestTimeout);
+
+		requestFuture.handle(new BiFunction<Acknowledge, Throwable, Void>() {
+			@Override
+			public Void apply(Acknowledge acknowledge, Throwable throwable) {
+				if (acknowledge != null) {
+					completableFuture.complete(acknowledge);
+				} else {
+					completableFuture.completeExceptionally(throwable);
+				}
+
+				return null;
+			}
+		});
+
+		completableFuture.handleAsync(new BiFunction<Acknowledge, Throwable, Void>() {
 			@Override
-			public Void apply(TMSlotRequestReply slotRequestReply, Throwable throwable) {
-				TaskExecutorRegistration current = taskManagers.get(slotRequestReply.getResourceID());
-				if (current != null && current.getInstanceID().equals(slotRequestReply.getInstanceID())) {
-					if (throwable != null || slotRequestReply instanceof TMSlotRequestRejected) {
-						handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId());
+			public Void apply(Acknowledge acknowledge, Throwable throwable) {
+				if (acknowledge != null) {
+					updateSlot(slotId, allocationId);
+				} else {
+					if (throwable instanceof SlotOccupiedException) {
+						SlotOccupiedException exception = (SlotOccupiedException) throwable;
+						updateSlot(slotId, exception.getAllocationId());
 					} else {
-						LOG.debug("Successfully registered slot {} ", freeSlot.getSlotId());
+						removeSlotRequestFromSlot(slotId, allocationId);
+					}
+
+					if (!(throwable instanceof CancellationException)) {
+						handleFailedSlotRequest(slotId, allocationId, throwable);
+					} else {
+						LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
 					}
-				} else {
-					LOG.debug("Discarding message from obsolete TaskExecutor with InstanceID {}",
-						slotRequestReply.getInstanceID());
 				}
+
 				return null;
 			}
-		}, rmServices.getMainThreadExecutor());
+		}, mainThreadExecutor);
 	}
 
 	/**
-	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
-	 * formerly received slot request, it is either in pending list or already been allocated.
+	 * Handles a free slot. It first tries to find a pending slot request which can be fulfilled.
+	 * If there is no such request, then it will add the slot to the set of free slots.
 	 *
-	 * @param request The slot request
-	 * @return <tt>true</tt> if the request is duplicated
+	 * @param freeSlot to find a new slot request for
 	 */
-	private boolean isRequestDuplicated(final SlotRequest request) {
-		final AllocationID allocationId = request.getAllocationId();
-		return pendingSlotRequests.containsKey(allocationId)
-			|| allocationMap.isAllocated(allocationId);
+	private void handleFreeSlot(TaskManagerSlot freeSlot) {
+		PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());
+
+		if (null != pendingSlotRequest) {
+			allocateSlot(freeSlot, pendingSlotRequest);
+		} else {
+			freeSlots.put(freeSlot.getSlotId(), freeSlot);
+		}
 	}
 
 	/**
-	 * Registers a new slot with the SlotManager.
+	 * Removes the given set of slots from the slot manager.
 	 *
-	 * @param slot The ResourceSlot which will be registered
+	 * @param slotsToRemove identifying the slots to remove from the slot manager
 	 */
-	private void registerNewSlot(final ResourceSlot slot) {
-		final SlotID slotId = slot.getSlotId();
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+	private void removeSlots(Iterable<SlotID> slotsToRemove) {
+		for (SlotID slotId : slotsToRemove) {
+			removeSlot(slotId);
 		}
-		registeredSlots.get(resourceId).put(slotId, slot);
 	}
 
-	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			return null;
+	/**
+	 * Removes the given slot from the slot manager.
+	 *
+	 * @param slotId identifying the slot to remove
+	 */
+	private void removeSlot(SlotID slotId) {
+		TaskManagerSlot slot = slots.remove(slotId);
+
+		if (null != slot) {
+			freeSlots.remove(slotId);
+
+			if (slot.hasPendingSlotRequest()) {
+				// reject the pending slot request --> triggering a new allocation attempt
+				rejectPendingSlotRequest(
+					slot.getAssignedSlotRequest(),
+					new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+			}
+
+			AllocationID oldAllocationId = slot.getAllocationId();
+
+			fulfilledSlotRequests.remove(oldAllocationId);
+
+			TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+			if (null != taskManagerRegistration) {
+				taskManagerRegistration.removeSlot(slotId);
+			}
+		} else {
+			LOG.debug("There was no slot registered with slot id {}.", slotId);
 		}
-		return registeredSlots.get(resourceId).get(slotId);
 	}
 
-	// ------------------------------------------------------------------------
-	//  Framework specific behavior
-	// ------------------------------------------------------------------------
+	// ---------------------------------------------------------------------------------------------
+	// Internal request handling methods
+	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * Choose a slot to use among all free slots, the behavior is framework specified.
+	 * Removes a pending slot request identified by the given allocation id from a slot identified
+	 * by the given slot id.
 	 *
-	 * @param request   The slot request
-	 * @param freeSlots All slots which can be used
-	 * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+	 * @param slotId identifying the slot
+	 * @param allocationId identifying the presumable assigned pending slot request
 	 */
-	protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
-		final Map<SlotID, ResourceSlot> freeSlots);
+	private void removeSlotRequestFromSlot(SlotID slotId, AllocationID allocationId) {
+		TaskManagerSlot taskManagerSlot = slots.get(slotId);
+
+		if (null != taskManagerSlot) {
+			if (taskManagerSlot.hasPendingSlotRequest() && Objects.equals(allocationId, taskManagerSlot.getAssignedSlotRequest().getAllocationId())) {
+				taskManagerSlot.setAssignedSlotRequest(null);
+			}
+
+			if (taskManagerSlot.isFree()) {
+				handleFreeSlot(taskManagerSlot);
+			}
+
+			TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+			if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
+				registerTaskManagerTimeout(taskManagerRegistration);
+			}
+		} else {
+			LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId);
+		}
+	}
 
 	/**
-	 * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+	 * Handles a failed slot request. The slot manager tries to find a new slot fulfilling
+	 * the resource requirements for the failed slot request.
 	 *
-	 * @param offeredSlot     The free slot
-	 * @param pendingRequests All the pending slot requests
-	 * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+	 * @param slotId identifying the slot which was assigned to the slot request before
+	 * @param allocationId identifying the failed slot request
+	 * @param cause of the failure
 	 */
-	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
-		final Map<AllocationID, SlotRequest> pendingRequests);
+	private void handleFailedSlotRequest(SlotID slotId, AllocationID allocationId, Throwable cause) {
+		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.get(allocationId);
+
+		LOG.debug("Slot request with allocation id {} failed for slot {}.", allocationId, slotId, cause);
+
+		if (null != pendingSlotRequest) {
+			pendingSlotRequest.setRequestFuture(null);
+
+			try {
+				internalRequestSlot(pendingSlotRequest);
+			} catch (ResourceManagerException e) {
+				pendingSlotRequests.remove(allocationId);
 
-	// ------------------------------------------------------------------------
-	//  Helper classes
-	// ------------------------------------------------------------------------
+				resourceManagerActions.notifyAllocationFailure(
+					pendingSlotRequest.getJobId(),
+					allocationId,
+					e);
+			}
+		} else {
+			LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", allocationId);
+		}
+	}
 
 	/**
-	 * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
-	 * either by SlotID or AllocationID.
+	 * Rejects the pending slot request by failing the request future with a
+	 * {@link SlotAllocationException}.
+	 *
+	 * @param pendingSlotRequest to reject
+	 * @param cause of the rejection
 	 */
-	private static class AllocationMap {
+	private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception cause) {
+		CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
 
-		/** All allocated slots (by SlotID) */
-		private final Map<SlotID, AllocationID> allocatedSlots;
+		if (null != request) {
+			request.completeExceptionally(new SlotAllocationException(cause));
+		} else {
+			LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", pendingSlotRequest.getAllocationId());
+		}
+	}
 
-		/** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
-		private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+	/**
+	 * Cancels the given slot request.
+	 *
+	 * @param pendingSlotRequest to cancel
+	 */
+	private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+		pendingSlotRequest.cancelTimeout();
 
-		AllocationMap() {
-			this.allocatedSlots = new HashMap<>(16);
-			this.allocatedSlotsByAllocationId = new HashMap<>(16);
-		}
+		CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
 
-		/**
-		 * Add a allocation
-		 *
-		 * @param slotId       The slot id
-		 * @param allocationId The allocation id
-		 */
-		void addAllocation(final SlotID slotId, final AllocationID allocationId) {
-			allocatedSlots.put(slotId, allocationId);
-			allocatedSlotsByAllocationId.put(allocationId, slotId);
+		if (null != request) {
+			request.cancel(false);
 		}
+	}
+
+	// ---------------------------------------------------------------------------------------------
+	// Internal timeout methods
+	// ---------------------------------------------------------------------------------------------
+
+	private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) {
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
 
-		/**
-		 * De-allocation with slot id
-		 *
-		 * @param slotId The slot id
-		 */
-		void removeAllocation(final SlotID slotId) {
-			if (allocatedSlots.containsKey(slotId)) {
-				final AllocationID allocationId = allocatedSlots.get(slotId);
-				allocatedSlots.remove(slotId);
-				allocatedSlotsByAllocationId.remove(allocationId);
+		if (null != taskManagerRegistration) {
+			if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) {
+				if (anySlotUsed(taskManagerRegistration.getSlots())) {
+					LOG.debug("Cannot release the task manager with instance id {}, because some " +
+						"of its slots are still being used.", instanceId);
+				} else {
+					unregisterTaskManager(instanceId);
+
+					resourceManagerActions.releaseResource(instanceId);
+				}
+			} else {
+				taskManagerRegistrations.put(instanceId, taskManagerRegistration);
+
+				LOG.debug("Expected timeout identifier {} differs from the task manager's " +
+					"timeout identifier {}. Ignoring the task manager timeout call.",
+					timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier());
 			}
+		} else {
+			LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId);
 		}
+	}
 
-		/**
-		 * De-allocation with allocation id
-		 *
-		 * @param allocationId The allocation id
-		 */
-		void removeAllocation(final AllocationID allocationId) {
-			if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
-				SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
-				allocatedSlotsByAllocationId.remove(allocationId);
-				allocatedSlots.remove(slotId);
+	private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) {
+		PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+		if (null != pendingSlotRequest) {
+			if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) {
+				if (!pendingSlotRequest.isAssigned()) {
+
+					resourceManagerActions.notifyAllocationFailure(
+						pendingSlotRequest.getJobId(),
+						allocationId,
+						new TimeoutException("The allocation could not be fulfilled in time."));
+				} else {
+					LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId);
+				}
+			} else {
+				pendingSlotRequests.put(allocationId, pendingSlotRequest);
+
+				LOG.debug("Expected timeout identifier {} differs from the pending slot request's " +
+					"timeout identifier {}. Ignoring the slot request timeout call.",
+					timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier());
 			}
+		} else {
+			LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId);
 		}
+	}
 
-		/**
-		 * Check whether allocation exists by slot id
-		 *
-		 * @param slotId The slot id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final SlotID slotId) {
-			return allocatedSlots.containsKey(slotId);
-		}
+	// ---------------------------------------------------------------------------------------------
+	// Internal utility methods
+	// ---------------------------------------------------------------------------------------------
 
-		/**
-		 * Check whether allocation exists by allocation id
-		 *
-		 * @param allocationId The allocation id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.containsKey(allocationId);
-		}
+	private boolean checkDuplicateRequest(AllocationID allocationId) {
+		return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId);
+	}
 
-		AllocationID getAllocationID(final SlotID slotId) {
-			return allocatedSlots.get(slotId);
-		}
+	private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
 
-		SlotID getSlotID(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.get(allocationId);
-		}
+		if (null != slotsToCheck) {
+			boolean idle = true;
 
-		public int size() {
-			return allocatedSlots.size();
-		}
+			for (SlotID slotId : slotsToCheck) {
+				TaskManagerSlot taskManagerSlot = slots.get(slotId);
 
-		public void clear() {
-			allocatedSlots.clear();
-			allocatedSlotsByAllocationId.clear();
+				if (null != taskManagerSlot) {
+					idle &= taskManagerSlot.isFree();
+				}
+			}
+
+			return !idle;
+		} else {
+			return false;
 		}
 	}
 
-	/**
-	 * Clears the state of the SlotManager after leadership revokal
-	 */
-	public void clearState() {
-		taskManagers.clear();
-		registeredSlots.clear();
-		pendingSlotRequests.clear();
-		freeSlots.clear();
-		allocationMap.clear();
-	}
+	private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) {
+		final UUID timeoutIdentifier = UUID.randomUUID();
 
-	// ------------------------------------------------------------------------
-	//  Testing utilities
-	// ------------------------------------------------------------------------
+		ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				mainThreadExecutor.execute(new Runnable() {
+					@Override
+					public void run() {
+						timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
+					}
+				});
+			}
+		}, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-	@VisibleForTesting
-	boolean isAllocated(final SlotID slotId) {
-		return allocationMap.isAllocated(slotId);
+		taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier);
 	}
 
-	@VisibleForTesting
-	boolean isAllocated(final AllocationID allocationId) {
-		return allocationMap.isAllocated(allocationId);
+	private void checkInit() {
+		Preconditions.checkState(started, "The slot manager has not been started.");
 	}
 
-	/**
-	 * Add free slots directly to the free pool, this will not trigger pending requests allocation
-	 *
-	 * @param slot The resource slot
-	 */
-	@VisibleForTesting
-	void addFreeSlot(final ResourceSlot slot) {
-		final ResourceID resourceId = slot.getResourceID();
-		final SlotID slotId = slot.getSlotId();
+	// ---------------------------------------------------------------------------------------------
+	// Testing methods
+	// ---------------------------------------------------------------------------------------------
 
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
-		}
-		registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-		freeSlots.put(slotId, slot);
+	@VisibleForTesting
+	TaskManagerSlot getSlot(SlotID slotId) {
+		return slots.get(slotId);
 	}
 
 	@VisibleForTesting
-	int getAllocatedSlotCount() {
-		return allocationMap.size();
+	int getNumberRegisteredSlots() {
+		return slots.size();
 	}
 
 	@VisibleForTesting
-	int getFreeSlotCount() {
-		return freeSlots.size();
+	PendingSlotRequest getSlotRequest(AllocationID allocationId) {
+		return pendingSlotRequests.get(allocationId);
 	}
 
 	@VisibleForTesting
-	int getPendingRequestCount() {
-		return pendingSlotRequests.size();
+	boolean hasTimeoutRegistered(InstanceID instanceId) {
+		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
+
+		if (null != taskManagerRegistration) {
+			return taskManagerRegistration.getTimeoutIdentifier() != null;
+		} else {
+			return false;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
new file mode 100644
index 0000000..d21c251
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+public class SlotManagerConfiguration {
+
+	private final Time taskManagerRequestTimeout;
+	private final Time slotRequestTimeout;
+	private final Time taskManagerTimeout;
+
+	public SlotManagerConfiguration(
+			Time taskManagerRequestTimeout,
+			Time slotRequestTimeout,
+			Time taskManagerTimeout) {
+		this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
+		this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
+		this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
+	}
+
+	public Time getTaskManagerRequestTimeout() {
+		return taskManagerRequestTimeout;
+	}
+
+	public Time getSlotRequestTimeout() {
+		return slotRequestTimeout;
+	}
+
+	public Time getTaskManagerTimeout() {
+		return taskManagerTimeout;
+	}
+
+	public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
+		ConfigOption<String> timeoutOption = ConfigOptions
+			.key(ConfigConstants.AKKA_ASK_TIMEOUT)
+			.defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+		final String strTimeout = configuration.getString(timeoutOption);
+		final Time timeout;
+
+		try {
+			timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
+		} catch (NumberFormatException e) {
+			throw new ConfigurationException("Could not parse the resource manager's timeout " +
+				"value " + timeoutOption + '.', e);
+		}
+
+		return new SlotManagerConfiguration(timeout, timeout, timeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
new file mode 100644
index 0000000..c322c81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+public class SlotManagerException extends ResourceManagerException {
+
+	private static final long serialVersionUID = -3723028616920379071L;
+
+	public SlotManagerException(String message) {
+		super(message);
+	}
+
+	public SlotManagerException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
deleted file mode 100644
index b4e9c99..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
-
-/**
- * Factory to create a SlotManager and provide it with dependencies.
- */
-public interface SlotManagerFactory {
-
-	/**
-	 * Creates a SlotManager and provides it with ResourceManager services.
-	 */
-	SlotManager create(ResourceManagerServices rmServices);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
new file mode 100644
index 0000000..8e23dbb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+public class TaskManagerRegistration {
+
+	private final TaskExecutorConnection taskManagerConnection;
+
+	private final HashSet<SlotID> slots;
+
+	private UUID timeoutIdentifier;
+
+	private ScheduledFuture<?> timeoutFuture;
+
+	public TaskManagerRegistration(TaskExecutorConnection taskManagerConnection) {
+		this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection);
+
+		slots = new HashSet<>(4);
+
+		timeoutIdentifier = null;
+		timeoutFuture = null;
+	}
+
+	public TaskExecutorConnection getTaskManagerConnection() {
+		return taskManagerConnection;
+	}
+
+	public InstanceID getInstanceId() {
+		return taskManagerConnection.getInstanceID();
+	}
+
+	public UUID getTimeoutIdentifier() {
+		return timeoutIdentifier;
+	}
+
+	public Set<SlotID> getSlots() {
+		return slots;
+	}
+
+	public boolean removeSlot(SlotID slotId) {
+		return slots.remove(slotId);
+	}
+
+	public void addSlot(SlotID slotId) {
+		slots.add(slotId);
+	}
+
+	public void cancelTimeout() {
+		if (null != timeoutFuture) {
+			timeoutFuture.cancel(false);
+
+			timeoutFuture = null;
+			timeoutIdentifier = null;
+		}
+	}
+
+	public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
+		cancelTimeout();
+
+		timeoutFuture = newTimeoutFuture;
+		timeoutIdentifier = newTimeoutIdentifier;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index 54adce6..7a9da28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -29,12 +30,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
  * have been allocated to.
  */
-public class SlotReport implements Serializable {
+public class SlotReport implements Serializable, Iterable<SlotStatus> {
 
 	private static final long serialVersionUID = -3150175198722481689L;
 
 	/** The slots status of the TaskManager */
-	private final List<SlotStatus> slotsStatus;
+	private final Collection<SlotStatus> slotsStatus;
 
 	public SlotReport() {
 		this(Collections.<SlotStatus>emptyList());
@@ -44,12 +45,16 @@ public class SlotReport implements Serializable {
 		this(Collections.singletonList(slotStatus));
 	}
 
-	public SlotReport(final List<SlotStatus> slotsStatus) {
+	public SlotReport(final Collection<SlotStatus> slotsStatus) {
 		this.slotsStatus = checkNotNull(slotsStatus);
 	}
 
-	public List<SlotStatus> getSlotsStatus() {
+	public Collection<SlotStatus> getSlotsStatus() {
 		return slotsStatus;
 	}
 
+	@Override
+	public Iterator<SlotStatus> iterator() {
+		return slotsStatus.iterator();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 0f57bb1..a3bc4a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -38,7 +38,7 @@ public class SlotStatus implements Serializable {
 	private final SlotID slotID;
 
 	/** the resource profile of the slot */
-	private final ResourceProfile profiler;
+	private final ResourceProfile resourceProfile;
 
 	/** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
 	private final AllocationID allocationID;
@@ -46,16 +46,17 @@ public class SlotStatus implements Serializable {
 	/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
 	private final JobID jobID;
 
-	public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-		this(slotID, profiler, null, null);
+	public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
+		this(slotID, resourceProfile, null, null);
 	}
 
 	public SlotStatus(
-			SlotID slotID, ResourceProfile profiler,
+			SlotID slotID,
+			ResourceProfile resourceProfile,
 			JobID jobID,
 			AllocationID allocationID) {
 		this.slotID = checkNotNull(slotID, "slotID cannot be null");
-		this.profiler = checkNotNull(profiler, "profile cannot be null");
+		this.resourceProfile = checkNotNull(resourceProfile, "profile cannot be null");
 		this.allocationID = allocationID;
 		this.jobID = jobID;
 	}
@@ -74,8 +75,8 @@ public class SlotStatus implements Serializable {
 	 *
 	 * @return The resource profile
 	 */
-	public ResourceProfile getProfiler() {
-		return profiler;
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
 	}
 
 	/**
@@ -110,7 +111,7 @@ public class SlotStatus implements Serializable {
 		if (!slotID.equals(that.slotID)) {
 			return false;
 		}
-		if (!profiler.equals(that.profiler)) {
+		if (!resourceProfile.equals(that.resourceProfile)) {
 			return false;
 		}
 		if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
@@ -123,7 +124,7 @@ public class SlotStatus implements Serializable {
 	@Override
 	public int hashCode() {
 		int result = slotID.hashCode();
-		result = 31 * result + profiler.hashCode();
+		result = 31 * result + resourceProfile.hashCode();
 		result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
 		result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d04cabb..5b8c8ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -61,8 +61,6 @@ import org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -71,6 +69,7 @@ import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
@@ -99,6 +98,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
@@ -315,7 +315,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			throw new TaskSubmissionException(message);
 		}
 
-		if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+		if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
 			final String message = "Rejecting the task submission because the job manager leader id " +
 				jobManagerLeaderId + " does not match the expected job manager leader id " +
 				jobManagerConnection.getLeaderId() + '.';
@@ -355,30 +355,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
 
 		Task task = new Task(
-				jobInformation,
-				taskInformation,
-				tdd.getExecutionAttemptId(),
-				tdd.getAllocationId(),
-				tdd.getSubtaskIndex(),
-				tdd.getAttemptNumber(),
-				tdd.getProducedPartitions(),
-				tdd.getInputGates(),
-				tdd.getTargetSlotNumber(),
-				tdd.getTaskStateHandles(),
-				memoryManager,
-				ioManager,
-				networkEnvironment,
-				broadcastVariableManager,
-				taskManagerActions,
-				inputSplitProvider,
-				checkpointResponder,
-				libraryCache,
-				fileCache,
-				taskManagerConfiguration,
-				taskMetricGroup,
-				resultPartitionConsumableNotifier,
-				partitionStateChecker,
-				getRpcService().getExecutor());
+			jobInformation,
+			taskInformation,
+			tdd.getExecutionAttemptId(),
+			tdd.getAllocationId(),
+			tdd.getSubtaskIndex(),
+			tdd.getAttemptNumber(),
+			tdd.getProducedPartitions(),
+			tdd.getInputGates(),
+			tdd.getTargetSlotNumber(),
+			tdd.getTaskStateHandles(),
+			memoryManager,
+			ioManager,
+			networkEnvironment,
+			broadcastVariableManager,
+			taskManagerActions,
+			inputSplitProvider,
+			checkpointResponder,
+			libraryCache,
+			fileCache,
+			taskManagerConfiguration,
+			taskMetricGroup,
+			resultPartitionConsumableNotifier,
+			partitionStateChecker,
+			getRpcService().getExecutor());
 
 		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
 
@@ -561,7 +561,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ----------------------------------------------------------------------
 
 	/**
-	 * /**
 	 * Requests a slot from the TaskManager
 	 *
 	 * @param slotId identifying the requested slot
@@ -573,12 +572,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	 * @return answer to the slot request
 	 */
 	@RpcMethod
-	public TMSlotRequestReply requestSlot(
+	public Acknowledge requestSlot(
 		final SlotID slotId,
 		final JobID jobId,
 		final AllocationID allocationId,
 		final String targetAddress,
 		final UUID rmLeaderId) throws SlotAllocationException {
+		// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
+
 		log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
 			allocationId, jobId, rmLeaderId);
 
@@ -608,7 +609,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			final String message = "The slot " + slotId + " has already been allocated for a different job.";
 
 			log.info(message);
-			throw new SlotAllocationException(message);
+
+			throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
 		}
 
 		if (jobManagerTable.contains(jobId)) {
@@ -635,7 +637,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			}
 		}
 
-		return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
+		return Acknowledge.get();
 	}
 
 	// ----------------------------------------------------------------------
@@ -1046,7 +1048,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 				resourceManagerGateway.notifySlotAvailable(
 					resourceManagerConnection.getTargetLeaderId(),
 					resourceManagerConnection.getRegistrationId(),
-					new SlotID(getResourceID(), freedSlotIndex));
+					new SlotID(getResourceID(), freedSlotIndex),
+					allocationId);
 			}
 		} catch (SlotNotFoundException e) {
 			log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1079,7 +1082,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) {
 		JobManagerConnection jmConnection = jobManagerTable.get(jobId);
 
-		return jmConnection != null && jmConnection.getLeaderId().equals(leaderId);
+		return jmConnection != null && Objects.equals(jmConnection.getLeaderId(), leaderId);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2bbf0e6..bedf8ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -51,7 +50,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @throws SlotAllocationException if the slot allocation fails
 	 * @return answer to the slot request
 	 */
-	Future<TMSlotRequestReply> requestSlot(
+	Future<Acknowledge> requestSlot(
 		SlotID slotId,
 		JobID jobId,
 		AllocationID allocationId,

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
new file mode 100644
index 0000000..93e67a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.util.Preconditions;
+
+public class SlotOccupiedException extends SlotAllocationException {
+	private static final long serialVersionUID = -3986333914244338888L;
+
+	private final AllocationID allocationId;
+
+	public SlotOccupiedException(String message, AllocationID allocationId) {
+		super(message);
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+	}
+
+	public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) {
+		super(message, cause);
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+	}
+
+	public SlotOccupiedException(Throwable cause, AllocationID allocationId) {
+		super(cause);
+		this.allocationId = Preconditions.checkNotNull(allocationId);
+	}
+
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 081d8f8..5c51c7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -509,6 +509,16 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		return new TaskIterator(jobId);
 	}
 
+	/**
+	 * Get the current allocation for the task slot with the given index.
+	 *
+	 * @param index identifying the slot for which the allocation id shall be retrieved
+	 * @return Allocation id of the specified slot if allocated; otherwise null
+	 */
+	public AllocationID getCurrentAllocation(int index) {
+		return taskSlots.get(index).getAllocationId();
+	}
+
 	// ---------------------------------------------------------------------
 	// TimeoutListener methods
 	// ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e4e20b9..41c2e16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -391,10 +391,14 @@ public class ResourceManagerTest extends TestLogger {
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
-		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final SlotManager slotManager = new SlotManager(
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		try {
 			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
@@ -404,7 +408,7 @@ public class ResourceManagerTest extends TestLogger {
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,
-				slotManagerFactory,
+				slotManager,
 				metricRegistry,
 				jobLeaderIdService,
 				testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 89fd22f..b444640 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -80,7 +80,7 @@ public class SlotPoolRpcTest {
 				Time.days(1), Time.days(1),
 				Time.milliseconds(100) // this is the timeout for the request tested here
 		);
-		pool.start(UUID.randomUUID());
+		pool.start(UUID.randomUUID(), "foobar");
 
 		Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index b4149b2..cf95461 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -78,7 +78,9 @@ public class SlotPoolTest extends TestLogger {
 
 		mainThreadValidatorUtil.enterMainThread();
 
-		slotPool.start(UUID.randomUUID());
+		final String jobManagerAddress = "foobar";
+
+		slotPool.start(UUID.randomUUID(), jobManagerAddress);
 
 		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 49bc570..986f848 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -25,9 +25,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -57,13 +58,17 @@ public class ResourceManagerHATest extends TestLogger {
 			Time.seconds(5L),
 			Time.seconds(5L));
 
-		ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L));
+		ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(
+			Time.seconds(5L),
+			new SlotManagerConfiguration(
+				TestingUtils.infiniteTime(),
+				TestingUtils.infiniteTime(),
+				TestingUtils.infiniteTime()));
 		ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
 			resourceManagerRuntimeServicesConfiguration,
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor());
 
-		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -76,7 +81,7 @@ public class ResourceManagerHATest extends TestLogger {
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,
-				slotManagerFactory,
+				resourceManagerRuntimeServices.getSlotManager(),
 				metricRegistry,
 				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 6a151ac..4836f74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -31,10 +31,11 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
@@ -240,12 +241,17 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
 			Time.seconds(5L));
-		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),
 			Time.minutes(5L));
+			
+		final SlotManager slotManager = new SlotManager(
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
 			rpcService,
@@ -254,7 +260,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,
-			slotManagerFactory,
+			slotManager,
 			metricRegistry,
 			jobLeaderIdService,
 			fatalErrorHandler);