You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:14 UTC

[07/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Introduce a hardened slot manager

[FLINK-5810] [flip-6] Introduce a hardened slot manager

Harden the slot manager so that it better deals with lost and out of order messages
from the TaskManager. The basic idea is that the TaskManager are considered the ground
truth and the SlotManager tries to maintain a consistent view of what is reported to it
by the TaskManagers. This has the assumption that the TaskManagers regularly report their
slot status to the SlotManager piggy backed on the heartbeat signals to the ResourceManager.
That way it is possible to handle lost and out of order messages because the SlotManager
will eventually converge on a consistent view of the actual slot allocation.

Additionally, the hardened SlotManager registers for idle TaskManagers and pending slot
requests a timeout. If the timoeut expires, then the TaskManagers are released and the
slot request is failed. This prevents resource leaks and wasteful resource allocation.

This closes #3394.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59aefb57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59aefb57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59aefb57

Branch: refs/heads/table-retraction
Commit: 59aefb57160f04e090e6620af1dd70fd4c121ae6
Parents: f3da8f6
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Feb 9 11:59:45 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:27:48 2017 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceSlot.java    |   72 -
 .../clusterframework/types/TaskManagerSlot.java |  114 ++
 .../flink/runtime/concurrent/Executors.java     |    2 +
 .../apache/flink/runtime/instance/SlotPool.java |   42 +-
 .../flink/runtime/jobmaster/JobMaster.java      |    2 +-
 .../resourcemanager/ResourceManager.java        |  142 +-
 .../resourcemanager/ResourceManagerGateway.java |    9 +-
 .../resourcemanager/ResourceManagerRunner.java  |    2 +-
 .../ResourceManagerRuntimeServices.java         |   25 +-
 ...urceManagerRuntimeServicesConfiguration.java |   14 +-
 .../runtime/resourcemanager/SlotRequest.java    |   14 +-
 .../StandaloneResourceManager.java              |   12 +-
 .../jobmanager/RMSlotRequestRegistered.java     |   33 -
 .../jobmanager/RMSlotRequestRejected.java       |   34 -
 .../messages/jobmanager/RMSlotRequestReply.java |   41 -
 .../taskexecutor/SlotAvailableReply.java        |   47 -
 .../taskexecutor/TMSlotRequestRegistered.java   |   35 -
 .../taskexecutor/TMSlotRequestRejected.java     |   35 -
 .../taskexecutor/TMSlotRequestReply.java        |   58 -
 .../registration/TaskExecutorConnection.java    |   47 +
 .../registration/TaskExecutorRegistration.java  |   51 -
 .../registration/WorkerRegistration.java        |    6 +-
 .../slotmanager/DefaultSlotManager.java         |   69 -
 .../slotmanager/PendingSlotRequest.java         |   93 ++
 .../slotmanager/ResourceManagerActions.java     |   34 +
 .../slotmanager/SlotManager.java                | 1086 ++++++++++-----
 .../slotmanager/SlotManagerConfiguration.java   |   74 ++
 .../slotmanager/SlotManagerException.java       |   34 +
 .../slotmanager/SlotManagerFactory.java         |   31 -
 .../slotmanager/TaskManagerRegistration.java    |   89 ++
 .../flink/runtime/taskexecutor/SlotReport.java  |   15 +-
 .../flink/runtime/taskexecutor/SlotStatus.java  |   19 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   69 +-
 .../taskexecutor/TaskExecutorGateway.java       |    3 +-
 .../exceptions/SlotOccupiedException.java       |   47 +
 .../taskexecutor/slot/TaskSlotTable.java        |   10 +
 .../clusterframework/ResourceManagerTest.java   |   10 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |    2 +-
 .../flink/runtime/instance/SlotPoolTest.java    |    4 +-
 .../resourcemanager/ResourceManagerHATest.java  |   13 +-
 .../ResourceManagerJobMasterTest.java           |   12 +-
 .../ResourceManagerTaskExecutorTest.java        |   11 +-
 .../resourcemanager/TestingSlotManager.java     |   86 --
 .../TestingSlotManagerFactory.java              |   30 -
 .../slotmanager/SlotManagerTest.java            | 1254 +++++++++++++-----
 .../slotmanager/SlotProtocolTest.java           |  328 +----
 .../taskexecutor/TaskExecutorITCase.java        |   14 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   38 +-
 .../runtime/testingUtils/TestingUtils.scala     |   12 +
 .../yarn/YarnFlinkApplicationMasterRunner.java  |    2 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   12 +-
 51 files changed, 2540 insertions(+), 1798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
deleted file mode 100644
index 0b9367d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.types;
-
-import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
- * identification and resource profile which we can compare to the resource request.
- */
-public class ResourceSlot implements ResourceIDRetrievable {
-
-	/** The unique identification of this slot */
-	private final SlotID slotId;
-
-	/** The resource profile of this slot */
-	private final ResourceProfile resourceProfile;
-
-	/** Gateway to the TaskExecutor which owns the slot */
-	private final TaskExecutorRegistration taskExecutorRegistration;
-
-	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorRegistration taskExecutorRegistration) {
-		this.slotId = checkNotNull(slotId);
-		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskExecutorRegistration = checkNotNull(taskExecutorRegistration);
-	}
-
-	@Override
-	public ResourceID getResourceID() {
-		return slotId.getResourceID();
-	}
-
-	public SlotID getSlotId() {
-		return slotId;
-	}
-
-	public ResourceProfile getResourceProfile() {
-		return resourceProfile;
-	}
-
-	public TaskExecutorRegistration getTaskExecutorRegistration() {
-		return taskExecutorRegistration;
-	}
-
-	/**
-	 * Check whether required resource profile can be matched by this slot.
-	 *
-	 * @param required The required resource profile
-	 * @return true if requirement can be matched
-	 */
-	public boolean isMatchingRequirement(ResourceProfile required) {
-		return resourceProfile.isMatching(required);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
new file mode 100644
index 0000000..cb9d349
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.resourcemanager.slotmanager.PendingSlotRequest;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A TaskManagerSlot represents a slot located in a TaskManager. It has a unique identification and
+ * resource profile associated.
+ */
+public class TaskManagerSlot {
+
+	/** The unique identification of this slot */
+	private final SlotID slotId;
+
+	/** The resource profile of this slot */
+	private final ResourceProfile resourceProfile;
+
+	/** Gateway to the TaskExecutor which owns the slot */
+	private final TaskExecutorConnection taskManagerConnection;
+
+	/** Allocation id for which this slot has been allocated */
+	private AllocationID allocationId;
+
+	/** Assigned slot request if there is currently an ongoing request */
+	private PendingSlotRequest assignedSlotRequest;
+
+	public TaskManagerSlot(
+			SlotID slotId,
+			ResourceProfile resourceProfile,
+			TaskExecutorConnection taskManagerConnection,
+			AllocationID allocationId) {
+		this.slotId = checkNotNull(slotId);
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerConnection = checkNotNull(taskManagerConnection);
+
+		this.allocationId = allocationId;
+		this.assignedSlotRequest = null;
+	}
+
+	public SlotID getSlotId() {
+		return slotId;
+	}
+
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	public TaskExecutorConnection getTaskManagerConnection() {
+		return taskManagerConnection;
+	}
+
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	public void setAllocationId(AllocationID allocationId) {
+		this.allocationId = allocationId;
+	}
+
+	public PendingSlotRequest getAssignedSlotRequest() {
+		return assignedSlotRequest;
+	}
+
+	public void setAssignedSlotRequest(PendingSlotRequest assignedSlotRequest) {
+		this.assignedSlotRequest = assignedSlotRequest;
+	}
+
+	public InstanceID getInstanceId() {
+		return taskManagerConnection.getInstanceID();
+	}
+
+	/**
+	 * Check whether required resource profile can be matched by this slot.
+	 *
+	 * @param required The required resource profile
+	 * @return true if requirement can be matched
+	 */
+	public boolean isMatchingRequirement(ResourceProfile required) {
+		return resourceProfile.isMatching(required);
+	}
+
+	public boolean isFree() {
+		return !isAllocated() && !hasPendingSlotRequest();
+	}
+
+	public boolean isAllocated() {
+		return null != allocationId;
+	}
+
+	public boolean hasPendingSlotRequest() {
+		return null != assignedSlotRequest;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index 63b6a25..e8a9be9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -92,6 +92,8 @@ public class Executors {
 					executorService.shutdownNow();
 
 					wasInterrupted = true;
+
+					Thread.currentThread().interrupt();
 				}
 
 				timeLeft = endTime - System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6d482b4..8cf6a9b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -35,11 +35,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -129,6 +127,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/** The gateway to communicate with resource manager */
 	private ResourceManagerGateway resourceManagerGateway;
 
+	private String jobManagerAddress;
+
 	// ------------------------------------------------------------------------
 
 	public SlotPool(RpcService rpcService, JobID jobId) {
@@ -172,10 +172,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	/**
 	 * Start the slot pool to accept RPC calls.
 	 *
-	 * @param jobManagerLeaderId The necessary leader id for running the job.
+	 * @param newJobManagerLeaderId The necessary leader id for running the job.
+	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
 	 */
-	public void start(UUID jobManagerLeaderId) throws Exception {
-		this.jobManagerLeaderId = jobManagerLeaderId;
+	public void start(UUID newJobManagerLeaderId, String newJobManagerAddress) throws Exception {
+		this.jobManagerLeaderId = checkNotNull(newJobManagerLeaderId);
+		this.jobManagerAddress = checkNotNull(newJobManagerAddress);
 
 		// TODO - start should not throw an exception
 		try {
@@ -315,33 +317,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
-		Future<RMSlotRequestReply> rmResponse = resourceManagerGateway.requestSlot(
+		Future<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
 				jobManagerLeaderId, resourceManagerLeaderId,
-				new SlotRequest(jobId, allocationID, resources),
+				new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
 				resourceManagerRequestsTimeout);
 
-		// on success, trigger let the slot pool know
-		Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() {
+		Future<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(new AcceptFunction<Acknowledge>() {
 			@Override
-			public void accept(RMSlotRequestReply reply) {
-				if (reply.getAllocationID() != null && reply.getAllocationID().equals(allocationID)) {
-					if (reply instanceof RMSlotRequestRegistered) {
-						slotRequestToResourceManagerSuccess(allocationID);
-					}
-					else if (reply instanceof RMSlotRequestRejected) {
-						slotRequestToResourceManagerFailed(allocationID,
-								new Exception("ResourceManager rejected slot request"));
-					}
-					else {
-						slotRequestToResourceManagerFailed(allocationID, 
-								new Exception("Unknown ResourceManager response: " + reply));
-					}
-				}
-				else {
-					future.completeExceptionally(new Exception(String.format(
-							"Bug: ResourceManager response had wrong AllocationID. Request: %s , Response: %s", 
-							allocationID, reply.getAllocationID())));
-				}
+			public void accept(Acknowledge value) {
+				slotRequestToResourceManagerSuccess(allocationID);
 			}
 		}, getMainThreadExecutor());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ab43577..99dbc86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -345,7 +345,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		try {
 			// start the slot pool make sure the slot pool now accepts messages for this leader
 			log.debug("Staring SlotPool component");
-			slotPool.start(leaderSessionID);
+			slotPool.start(leaderSessionID, getAddress());
 		} catch (Exception e) {
 			log.error("Faild to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), e);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f17dbe5..2ef8276 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -40,14 +41,13 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -68,7 +69,6 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -114,17 +114,14 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/** The heartbeat manager with job managers. */
 	private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
-	/** The factory to construct the SlotManager. */
-	private final SlotManagerFactory slotManagerFactory;
-
 	/** Registry to use for metrics */
 	private final MetricRegistry metricRegistry;
 
 	/** Fatal error handler */
 	private final FatalErrorHandler fatalErrorHandler;
 
-	/** The SlotManager created by the slotManagerFactory when the ResourceManager is started. */
-	private SlotManager slotManager;
+	/** The slot manager maintains the available slots */
+	private final SlotManager slotManager;
 
 	/** The service to elect a ResourceManager leader. */
 	private LeaderElectionService leaderElectionService;
@@ -142,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			SlotManagerFactory slotManagerFactory,
+			SlotManager slotManager,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
@@ -152,7 +149,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		this.resourceId = checkNotNull(resourceId);
 		this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-		this.slotManagerFactory = checkNotNull(slotManagerFactory);
+		this.slotManager = checkNotNull(slotManager);
 		this.metricRegistry = checkNotNull(metricRegistry);
 		this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
@@ -187,13 +184,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		// start a leader
 		super.start();
 
-		try {
-			// SlotManager should start first
-			slotManager = slotManagerFactory.create(createResourceManagerServices());
-		} catch (Exception e) {
-			throw new ResourceManagerException("Could not create the slot manager.", e);
-		}
-
 		leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 
 		try {
@@ -220,19 +210,25 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		jobManagerHeartbeatManager.stop();
 
 		try {
-			super.shutDown();
+			slotManager.close();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
-		clearState();
-
 		try {
 			leaderElectionService.stop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
 
+		try {
+			super.shutDown();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		clearState();
+
 		if (exception != null) {
 			ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
 		}
@@ -397,10 +393,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 */
 	@RpcMethod
 	public Future<RegistrationResponse> registerTaskExecutor(
-		final UUID resourceManagerLeaderId,
-		final String taskExecutorAddress,
+			final UUID resourceManagerLeaderId,
+			final String taskExecutorAddress,
 		final ResourceID taskExecutorResourceId,
-		final SlotReport slotReport) {
+			final SlotReport slotReport) {
 
 		if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
 			Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
@@ -415,6 +411,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						if (oldRegistration != null) {
 							// TODO :: suggest old taskExecutor to stop itself
 							log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
+
+							// remove old task manager registration from slot manager
+							slotManager.unregisterTaskManager(oldRegistration.getInstanceID());
 						}
 
 						WorkerType newWorker = workerStarted(taskExecutorResourceId);
@@ -422,7 +421,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 							new WorkerRegistration<>(taskExecutorGateway, newWorker);
 
 						taskExecutors.put(taskExecutorResourceId, registration);
-						slotManager.registerTaskExecutor(taskExecutorResourceId, registration, slotReport);
+
+						slotManager.registerTaskManager(registration, slotReport);
 
 						taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
 							@Override
@@ -483,26 +483,34 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public RMSlotRequestReply requestSlot(
+	public Acknowledge requestSlot(
 			UUID jobMasterLeaderID,
 			UUID resourceManagerLeaderID,
-			SlotRequest slotRequest) {
+			SlotRequest slotRequest) throws ResourceManagerException, LeaderSessionIDException {
 
-		log.info("Request slot with profile {} for job {} with allocation id {}.",
-			slotRequest.getResourceProfile(),
-			slotRequest.getJobId(),
-			slotRequest.getAllocationId());
+		if (!Objects.equals(resourceManagerLeaderID, leaderSessionId)) {
+			throw new LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId);
+		}
 
 		JobID jobId = slotRequest.getJobId();
 		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 
-		if (jobManagerRegistration != null
-				&& jobMasterLeaderID.equals(jobManagerRegistration.getLeaderID())
-				&& resourceManagerLeaderID.equals(leaderSessionId)) {
-			return slotManager.requestSlot(slotRequest);
+		if (null != jobManagerRegistration) {
+			if (Objects.equals(jobMasterLeaderID, jobManagerRegistration.getLeaderID())) {
+				log.info("Request slot with profile {} for job {} with allocation id {}.",
+					slotRequest.getResourceProfile(),
+					slotRequest.getJobId(),
+					slotRequest.getAllocationId());
+
+				slotManager.registerSlotRequest(slotRequest);
+
+				return Acknowledge.get();
+			} else {
+				throw new LeaderSessionIDException(jobMasterLeaderID, jobManagerRegistration.getLeaderID());
+			}
+
 		} else {
-			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
-			return new RMSlotRequestRejected(slotRequest.getAllocationId());
+			throw new ResourceManagerException("Could not find registered job manager for job " + jobId + '.');
 		}
 	}
 
@@ -511,23 +519,23 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * @param resourceManagerLeaderId TaskExecutor's resource manager leader id
 	 * @param instanceID TaskExecutor's instance id
 	 * @param slotId The slot id of the available slot
-	 * @return SlotAvailableReply
 	 */
 	@RpcMethod
 	public void notifySlotAvailable(
 			final UUID resourceManagerLeaderId,
 			final InstanceID instanceID,
-			final SlotID slotId) {
+			final SlotID slotId,
+			final AllocationID allocationId) {
 
-		if (resourceManagerLeaderId.equals(leaderSessionId)) {
+		if (Objects.equals(resourceManagerLeaderId, leaderSessionId)) {
 			final ResourceID resourceId = slotId.getResourceID();
 			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
 
 			if (registration != null) {
 				InstanceID registrationId = registration.getInstanceID();
 
-				if (registrationId.equals(instanceID)) {
-					slotManager.notifySlotAvailable(resourceId, slotId);
+				if (Objects.equals(registrationId, instanceID)) {
+					slotManager.freeSlot(slotId, allocationId);
 				} else {
 					log.debug("Invalid registration id for slot available message. This indicates an" +
 						" outdated request.");
@@ -587,8 +595,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	/**
 	 * Cleanup application and shut down cluster
 	 *
-	 * @param finalStatus
-	 * @param optionalDiagnostics
+	 * @param finalStatus of the Flink application
+	 * @param optionalDiagnostics for the Flink application
 	 */
 	@RpcMethod
 	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
@@ -597,12 +605,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	@RpcMethod
-	public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException {
-		if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) {
+	public Integer getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) throws LeaderIdMismatchException {
+		if (Objects.equals(leaderSessionId, requestLeaderSessionId)) {
 			return taskExecutors.size();
 		}
 		else {
-			throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+			throw new LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId);
 		}
 	}
 
@@ -628,7 +636,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		jobManagerRegistrations.clear();
 		jmResourceIdRegistrations.clear();
 		taskExecutors.clear();
-		slotManager.clearState();
 
 		try {
 			jobLeaderIdService.clear();
@@ -686,7 +693,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			log.info("Task manager {} failed because {}.", resourceID, cause);
 
 			// TODO :: suggest failed task executor to stop itself
-			slotManager.notifyTaskManagerFailure(resourceID);
+			slotManager.unregisterTaskManager(workerRegistration.getInstanceID());
 
 			workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
 		} else {
@@ -702,11 +709,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * @return True if the given leader id matches the actual leader id and is not null; otherwise false
 	 */
 	protected boolean isValid(UUID resourceManagerLeaderId) {
-		if (resourceManagerLeaderId == null) {
-			return false;
-		} else {
-			return resourceManagerLeaderId.equals(leaderSessionId);
-		}
+		return Objects.equals(resourceManagerLeaderId, leaderSessionId);
 	}
 
 	protected void removeJob(JobID jobId) {
@@ -725,7 +728,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		if (jobManagerRegistrations.containsKey(jobId)) {
 			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 
-			if (jobManagerRegistration.getLeaderID().equals(oldJobLeaderId)) {
+			if (Objects.equals(jobManagerRegistration.getLeaderID(), oldJobLeaderId)) {
 				disconnectJobManager(jobId, new Exception("Job leader lost leadership."));
 			} else {
 				log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", jobId);
@@ -806,6 +809,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 				leaderSessionId = newLeaderSessionID;
 
+				slotManager.start(leaderSessionId, getMainThreadExecutor(), new ResourceManagerActionsImpl());
+
 				getRpcService().execute(new Runnable() {
 					@Override
 					public void run() {
@@ -829,6 +834,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 				clearState();
 
+				slotManager.suspend();
+
 				leaderSessionId = null;
 			}
 		});
@@ -874,6 +881,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	@VisibleForTesting
 	public abstract void startNewWorker(ResourceProfile resourceProfile);
 
+	public abstract void stopWorker(InstanceID instanceId);
+
 	/**
 	 * Callback when a worker was started.
 	 * @param resourceID The worker resource id
@@ -884,30 +893,21 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	//  Static utility classes
 	// ------------------------------------------------------------------------
 
-	protected ResourceManagerServices createResourceManagerServices() {
-		return new DefaultResourceManagerServices();
-	}
-
-	private class DefaultResourceManagerServices implements ResourceManagerServices {
-
-		@Override
-		public UUID getLeaderID() {
-			return ResourceManager.this.leaderSessionId;
-		}
+	private class ResourceManagerActionsImpl implements ResourceManagerActions {
 
 		@Override
-		public void allocateResource(ResourceProfile resourceProfile) {
-			ResourceManager.this.startNewWorker(resourceProfile);
+		public void releaseResource(InstanceID instanceId) {
+			stopWorker(instanceId);
 		}
 
 		@Override
-		public Executor getAsyncExecutor() {
-			return ResourceManager.this.getRpcService().getExecutor();
+		public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
+			startNewWorker(resourceProfile);
 		}
 
 		@Override
-		public Executor getMainThreadExecutor() {
-			return ResourceManager.this.getMainThreadExecutor();
+		public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause) {
+			log.info("Slot request with allocation id {} for job {} failed.", allocationId, jobId, cause);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 530113f..fcbedcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -66,7 +67,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest The slot to request
 	 * @return The confirmation that the slot gets allocated
 	 */
-	Future<RMSlotRequestReply> requestSlot(
+	Future<Acknowledge> requestSlot(
 		UUID resourceManagerLeaderID,
 		UUID jobMasterLeaderID,
 		SlotRequest slotRequest,
@@ -96,11 +97,13 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param resourceManagerLeaderId The ResourceManager leader id
 	 * @param instanceId TaskExecutor's instance id
 	 * @param slotID The SlotID of the freed slot
+	 * @param oldAllocationId to which the slot has been allocated
 	 */
 	void notifySlotAvailable(
 		UUID resourceManagerLeaderId,
 		InstanceID instanceId,
-		SlotID slotID);
+		SlotID slotID,
+		AllocationID oldAllocationId);
 
 	/**
 	 * Registers an infoMessage listener

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index c6d64db..d0c411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -76,7 +76,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManagerFactory(),
+			resourceManagerRuntimeServices.getSlotManager(),
 			metricRegistry,
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			this);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
index 56edde4..ed8f1e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -29,16 +29,16 @@ import org.apache.flink.util.Preconditions;
  */
 public class ResourceManagerRuntimeServices {
 
-	private final SlotManagerFactory slotManagerFactory;
+	private final SlotManager slotManager;
 	private final JobLeaderIdService jobLeaderIdService;
 
-	public ResourceManagerRuntimeServices(SlotManagerFactory slotManagerFactory, JobLeaderIdService jobLeaderIdService) {
-		this.slotManagerFactory = Preconditions.checkNotNull(slotManagerFactory);
+	public ResourceManagerRuntimeServices(SlotManager slotManager, JobLeaderIdService jobLeaderIdService) {
+		this.slotManager = Preconditions.checkNotNull(slotManager);
 		this.jobLeaderIdService = Preconditions.checkNotNull(jobLeaderIdService);
 	}
 
-	public SlotManagerFactory getSlotManagerFactory() {
-		return slotManagerFactory;
+	public SlotManager getSlotManager() {
+		return slotManager;
 	}
 
 	public JobLeaderIdService getJobLeaderIdService() {
@@ -58,12 +58,19 @@ public class ResourceManagerRuntimeServices {
 			HighAvailabilityServices highAvailabilityServices,
 			ScheduledExecutor scheduledExecutor) throws Exception {
 
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final SlotManagerConfiguration slotManagerConfiguration = configuration.getSlotManagerConfiguration();
+
+		final SlotManager slotManager = new SlotManager(
+			scheduledExecutor,
+			slotManagerConfiguration.getTaskManagerRequestTimeout(),
+			slotManagerConfiguration.getSlotRequestTimeout(),
+			slotManagerConfiguration.getTaskManagerTimeout());
+
 		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			scheduledExecutor,
 			configuration.getJobTimeout());
 
-		return new ResourceManagerRuntimeServices(slotManagerFactory, jobLeaderIdService);
+		return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
index 6de5f4d..79d1c02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 
@@ -32,14 +33,21 @@ public class ResourceManagerRuntimeServicesConfiguration {
 
 	private final Time jobTimeout;
 
-	public ResourceManagerRuntimeServicesConfiguration(Time jobTimeout) {
+	private final SlotManagerConfiguration slotManagerConfiguration;
+
+	public ResourceManagerRuntimeServicesConfiguration(Time jobTimeout, SlotManagerConfiguration slotManagerConfiguration) {
 		this.jobTimeout = Preconditions.checkNotNull(jobTimeout);
+		this.slotManagerConfiguration = Preconditions.checkNotNull(slotManagerConfiguration);
 	}
 
 	public Time getJobTimeout() {
 		return jobTimeout;
 	}
 
+	public SlotManagerConfiguration getSlotManagerConfiguration() {
+		return slotManagerConfiguration;
+	}
+
 	// ---------------------------- Static methods ----------------------------------
 
 	public static ResourceManagerRuntimeServicesConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
@@ -54,6 +62,8 @@ public class ResourceManagerRuntimeServicesConfiguration {
 				"value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e);
 		}
 
-		return new ResourceManagerRuntimeServicesConfiguration(jobTimeout);
+		final SlotManagerConfiguration slotManagerConfiguration = SlotManagerConfiguration.fromConfiguration(configuration);
+
+		return new ResourceManagerRuntimeServicesConfiguration(jobTimeout, slotManagerConfiguration);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
index 896421b..779e913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
@@ -42,10 +42,18 @@ public class SlotRequest implements Serializable {
 	/** The resource profile of the required slot */
 	private final ResourceProfile resourceProfile;
 
-	public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
+	/** Address of the emitting job manager */
+	private final String targetAddress;
+
+	public SlotRequest(
+			JobID jobId,
+			AllocationID allocationId,
+			ResourceProfile resourceProfile,
+			String targetAddress) {
 		this.jobId = checkNotNull(jobId);
 		this.allocationId = checkNotNull(allocationId);
 		this.resourceProfile = checkNotNull(resourceProfile);
+		this.targetAddress = checkNotNull(targetAddress);
 	}
 
 	/**
@@ -71,4 +79,8 @@ public class SlotRequest implements Serializable {
 	public ResourceProfile getResourceProfile() {
 		return resourceProfile;
 	}
+
+	public String getTargetAddress() {
+		return targetAddress;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 389ca5c..a921a29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,9 +23,10 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -44,7 +45,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			SlotManagerFactory slotManagerFactory,
+			SlotManager slotManager,
 			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
@@ -55,7 +56,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,
-			slotManagerFactory,
+			slotManager,
 			metricRegistry,
 			jobLeaderIdService,
 			fatalErrorHandler);
@@ -75,6 +76,11 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
+	public void stopWorker(InstanceID instanceId) {
+
+	}
+
+	@Override
 	protected ResourceID workerStarted(ResourceID resourceID) {
 		return resourceID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
deleted file mode 100644
index 01bc532..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public class RMSlotRequestRegistered extends RMSlotRequestReply {
-
-	private static final long serialVersionUID = 4760320859275256855L;
-
-	public RMSlotRequestRegistered(AllocationID allocationID) {
-		super(allocationID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
deleted file mode 100644
index 649d61c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Rejection message by the ResourceManager for a SlotRequest from the JobManager
- */
-public class RMSlotRequestRejected extends RMSlotRequestReply {
-
-	private static final long serialVersionUID = 9049346740895325144L;
-
-	public RMSlotRequestRejected(AllocationID allocationID) {
-		super(allocationID);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
deleted file mode 100644
index 66e1911..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-import java.io.Serializable;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public abstract class RMSlotRequestReply implements Serializable {
-
-	private static final long serialVersionUID = 42;
-
-	private final AllocationID allocationID;
-
-	public RMSlotRequestReply(AllocationID allocationID) {
-		this.allocationID = allocationID;
-	}
-
-	public AllocationID getAllocationID() {
-		return allocationID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
deleted file mode 100644
index f2e0105..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-
-import java.io.Serializable;
-import java.util.UUID;
-
-/**
- * Sent by the ResourceManager to the TaskExecutor confirm receipt of
- * {@code org.apache.flink.runtime.resourcemanager.ResourceManagerGateway.notifySlotAvailable}.
- */
-public class SlotAvailableReply implements Serializable {
-
-	private final UUID resourceManagerLeaderID;
-
-	private final SlotID slotID;
-
-	public SlotAvailableReply(UUID resourceManagerLeaderID, SlotID slotID) {
-		this.resourceManagerLeaderID = resourceManagerLeaderID;
-		this.slotID = slotID;
-	}
-
-	public UUID getResourceManagerLeaderID() {
-		return resourceManagerLeaderID;
-	}
-
-	public SlotID getSlotID() {
-		return slotID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
deleted file mode 100644
index c0f0f49..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceID;
-
-/**
- * Acknowledgment by the TaskExecutor for a SlotRequest from the ResourceManager
- */
-public class TMSlotRequestRegistered extends TMSlotRequestReply {
-
-	private static final long serialVersionUID = 4760320859275256855L;
-
-	public TMSlotRequestRegistered(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
-		super(instanceID, resourceID, allocationID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
deleted file mode 100644
index 9b10a35..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceID;
-
-/**
- * Rejection by the TaskExecutor for a SlotRequest from the ResourceManager
- */
-public class TMSlotRequestRejected extends TMSlotRequestReply {
-
-	private static final long serialVersionUID = 9049346740895325144L;
-
-	public TMSlotRequestRejected(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
-		super(instanceID, resourceID, allocationID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
deleted file mode 100644
index b23b6e9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceID;
-
-import java.io.Serializable;
-
-/**
- * Acknowledgment by the TaskExecutor for a SlotRequest from the ResourceManager
- */
-public abstract class TMSlotRequestReply implements Serializable {
-
-	private static final long serialVersionUID = 42;
-
-	private final InstanceID instanceID;
-
-	private final ResourceID resourceID;
-
-	private final AllocationID allocationID;
-
-	protected TMSlotRequestReply(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
-		this.instanceID = instanceID;
-		this.resourceID = resourceID;
-		this.allocationID = allocationID;
-	}
-
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
-
-	public ResourceID getResourceID() {
-		return resourceID;
-	}
-
-	public AllocationID getAllocationID() {
-		return allocationID;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
new file mode 100644
index 0000000..e4522f2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+/**
+ * This class is responsible for grouping the TaskExecutorGateway and the InstanceID
+ * of a registered task executor.
+ */
+public class TaskExecutorConnection {
+
+	private final InstanceID instanceID;
+
+	private TaskExecutorGateway taskExecutorGateway;
+
+	public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
+		this.instanceID = new InstanceID();
+		this.taskExecutorGateway = taskExecutorGateway;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
deleted file mode 100644
index bfa9c00..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.registration;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-
-import java.io.Serializable;
-
-/**
- * This class is responsible for grouping the TaskExecutorGateway and the InstanceID
- * of a registered task executor.
- */
-public class TaskExecutorRegistration implements Serializable {
-
-	private static final long serialVersionUID = -2062957799469434614L;
-
-	private final InstanceID instanceID;
-
-	private TaskExecutorGateway taskExecutorGateway;
-
-	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway) {
-		this.instanceID = new InstanceID();
-		this.taskExecutorGateway = taskExecutorGateway;
-	}
-
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
-
-	public TaskExecutorGateway getTaskExecutorGateway() {
-		return taskExecutorGateway;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index 7ee7a1f..8f949b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -24,11 +24,9 @@ import org.apache.flink.util.Preconditions;
 import java.io.Serializable;
 
 /**
- * This class extends the {@link TaskExecutorRegistration}, adding the worker information.
+ * This class extends the {@link TaskExecutorConnection}, adding the worker information.
  */
-public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorRegistration {
-
-	private static final long serialVersionUID = -2062957799469434614L;
+public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorConnection {
 
 	private final WorkerType worker;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
deleted file mode 100644
index 9508936..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A slot manager that answers requests with slots without any special logic. The first slot
- * in the maps to match a request is chosen.
- */
-public class DefaultSlotManager extends SlotManager {
-
-	public DefaultSlotManager(ResourceManagerServices rmServices) {
-		super(rmServices);
-	}
-
-	@Override
-	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
-		if (slotIterator.hasNext()) {
-			return slotIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
-		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
-		if (requestIterator.hasNext()) {
-			return requestIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class Factory implements SlotManagerFactory {
-
-		@Override
-		public SlotManager create(ResourceManagerServices rmServices) {
-			return new DefaultSlotManager(rmServices);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
new file mode 100644
index 0000000..894f146
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+public class PendingSlotRequest {
+
+	private final SlotRequest slotRequest;
+
+	private CompletableFuture<Acknowledge> requestFuture;
+
+	private UUID timeoutIdentifier;
+
+	private ScheduledFuture<?> timeoutFuture;
+
+	public PendingSlotRequest(SlotRequest slotRequest) {
+		this.slotRequest = Preconditions.checkNotNull(slotRequest);
+	}
+
+	public AllocationID getAllocationId() {
+		return slotRequest.getAllocationId();
+	}
+
+	public ResourceProfile getResourceProfile() {
+		return slotRequest.getResourceProfile();
+	}
+
+	public UUID getTimeoutIdentifier() {
+		return timeoutIdentifier;
+	}
+
+	public JobID getJobId() {
+		return slotRequest.getJobId();
+	}
+
+	public String getTargetAddress() {
+		return slotRequest.getTargetAddress();
+	}
+
+	public boolean isAssigned() {
+		return null != requestFuture;
+	}
+
+	public void setRequestFuture(CompletableFuture<Acknowledge> requestFuture) {
+		this.requestFuture = requestFuture;
+	}
+
+	public CompletableFuture<Acknowledge> getRequestFuture() {
+		return requestFuture;
+	}
+
+	public void cancelTimeout() {
+		if (timeoutFuture != null) {
+			timeoutFuture.cancel(true);
+
+			timeoutIdentifier = null;
+			timeoutFuture = null;
+		}
+	}
+
+	public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
+		cancelTimeout();
+
+		timeoutFuture = newTimeoutFuture;
+		timeoutIdentifier = newTimeoutIdentifier;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
new file mode 100644
index 0000000..c8b288e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceManagerActions.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+public interface ResourceManagerActions {
+
+	void releaseResource(InstanceID instanceId);
+
+	void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
+
+	void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause);
+}