You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:42 UTC

[22/50] [abbrv] flink git commit: [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderRetrievalListener to SlotManager to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

This closes #2463


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a48fa50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a48fa50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a48fa50

Branch: refs/heads/flip-6
Commit: 8a48fa503d808cd588daee06f90d9e010ee47e89
Parents: 8e9db6b
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Sep 1 16:53:31 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:41 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java |   2 +-
 .../clusterframework/types/ResourceSlot.java    |  14 +-
 .../resourcemanager/JobMasterRegistration.java  |  10 +-
 .../resourcemanager/RegistrationResponse.java   |   9 +-
 .../resourcemanager/ResourceManager.java        | 167 +++---
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../runtime/resourcemanager/SlotAssignment.java |  25 -
 .../runtime/resourcemanager/SlotManager.java    | 523 -----------------
 .../resourcemanager/SlotRequestRegistered.java  |  33 ++
 .../resourcemanager/SlotRequestRejected.java    |  34 ++
 .../resourcemanager/SlotRequestReply.java       |  41 ++
 .../slotmanager/SimpleSlotManager.java          |  59 ++
 .../slotmanager/SlotManager.java                | 579 +++++++++++++++++++
 .../flink/runtime/taskexecutor/SlotStatus.java  |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |  17 +
 .../resourcemanager/ResourceManagerHATest.java  |   4 +-
 .../resourcemanager/SlotManagerTest.java        | 538 -----------------
 .../slotmanager/SlotManagerTest.java            | 554 ++++++++++++++++++
 .../slotmanager/SlotProtocolTest.java           | 225 +++++++
 .../flink/runtime/rpc/TestingRpcService.java    |   6 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   4 +
 21 files changed, 1677 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ff1c4bf..fa3aabc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -68,6 +68,6 @@ public class ResourceProfile implements Serializable {
 	 * @return true if the requirement is matched, otherwise false
 	 */
 	public boolean isMatching(ResourceProfile required) {
-		return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB();
+		return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 8a6db5f..5fb8aee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -26,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
  * identification and resource profile which we can compare to the resource request.
  */
-public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+public class ResourceSlot implements ResourceIDRetrievable {
 
 	private static final long serialVersionUID = -5853720153136840674L;
 
@@ -36,9 +38,13 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
 	/** The resource profile of this slot */
 	private final ResourceProfile resourceProfile;
 
-	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+	/** Gateway to the TaskExecutor which owns the slot */
+	private final TaskExecutorGateway taskExecutorGateway;
+
+	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorGateway taskExecutorGateway) {
 		this.slotId = checkNotNull(slotId);
 		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskExecutorGateway = taskExecutorGateway;
 	}
 
 	@Override
@@ -54,6 +60,10 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable {
 		return resourceProfile;
 	}
 
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
 	/**
 	 * Check whether required resource profile can be matched by this slot.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 309dcc1..439e56b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,18 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+
 import java.io.Serializable;
 
 public class JobMasterRegistration implements Serializable {
 	private static final long serialVersionUID = 8411214999193765202L;
 
 	private final String address;
+	private final JobID jobID;
 
-	public JobMasterRegistration(String address) {
+	public JobMasterRegistration(String address, JobID jobID) {
 		this.address = address;
+		this.jobID = jobID;
 	}
 
 	public String getAddress() {
 		return address;
 	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
index fb6c401..796e634 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
@@ -18,26 +18,19 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.runtime.instance.InstanceID;
-
 import java.io.Serializable;
 
 public class RegistrationResponse implements Serializable {
 	private static final long serialVersionUID = -2379003255993119993L;
 
 	private final boolean isSuccess;
-	private final InstanceID instanceID;
 
-	public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+	public RegistrationResponse(boolean isSuccess) {
 		this.isSuccess = isSuccess;
-		this.instanceID = instanceID;
 	}
 
 	public boolean isSuccess() {
 		return isSuccess;
 	}
 
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 44c022b..29aba1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.resourcemanager;
 import akka.dispatch.Mapper;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -33,6 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
@@ -51,16 +55,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
 	private final HighAvailabilityServices highAvailabilityServices;
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID = null;
 
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
+	private LeaderElectionService leaderElectionService;
+
+	private final SlotManager slotManager;
+
+	private UUID leaderSessionID;
+
+	public ResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
+		this.slotManager = slotManager;
 	}
 
 	@Override
@@ -69,7 +85,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		try {
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
-			leaderElectionService.start(new ResourceManagerLeaderContender());
+			leaderElectionService.start(this);
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -94,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@VisibleForTesting
 	UUID getLeaderSessionID() {
-		return leaderSessionID;
+		return this.leaderSessionID;
 	}
 
 	/**
@@ -105,21 +121,20 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@RpcMethod
 	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+		final Future<JobMasterGateway> jobMasterFuture =
+			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+		final JobID jobID = jobMasterRegistration.getJobID();
 
 		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
 			@Override
 			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-				InstanceID instanceID;
 
-				if (jobMasterGateways.containsKey(jobMasterGateway)) {
-					instanceID = jobMasterGateways.get(jobMasterGateway);
-				} else {
-					instanceID = new InstanceID();
-					jobMasterGateways.put(jobMasterGateway, instanceID);
+				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+				if (existingGateway != null) {
+					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
+						existingGateway, jobID, jobMasterGateway);
 				}
-
-				return new RegistrationResponse(true, instanceID);
+				return new RegistrationResponse(true);
 			}
 		}, getMainThreadExecutionContext());
 	}
@@ -131,9 +146,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotAssignment requestSlot(SlotRequest slotRequest) {
-		System.out.println("SlotRequest: " + slotRequest);
-		return new SlotAssignment();
+	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+		final JobID jobId = slotRequest.getJobId();
+		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+
+		if (jobMasterGateway != null) {
+			return slotManager.requestSlot(slotRequest);
+		} else {
+			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+			return new SlotRequestRejected(slotRequest.getAllocationId());
+		}
 	}
 
 
@@ -154,61 +176,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 	}
 
-	private class ResourceManagerLeaderContender implements LeaderContender {
-
-		/**
-		 * Callback method when current resourceManager is granted leadership
-		 *
-		 * @param leaderSessionID unique leadershipID
-		 */
-		@Override
-		public void grantLeadership(final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-					ResourceManager.this.leaderSessionID = leaderSessionID;
-					// confirming the leader session ID might be blocking,
-					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				}
-			});
-		}
 
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasterGateways.clear();
-					leaderSessionID = null;
-				}
-			});
-		}
+	// ------------------------------------------------------------------------
+	//  Leader Contender
+	// ------------------------------------------------------------------------
 
-		@Override
-		public String getAddress() {
-			return ResourceManager.this.getAddress();
-		}
+	/**
+	 * Callback method when current resourceManager is granted leadership
+	 *
+	 * @param leaderSessionID unique leadershipID
+	 */
+	@Override
+	public void grantLeadership(final UUID leaderSessionID) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+				// confirming the leader session ID might be blocking,
+				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+				// notify SlotManager
+				slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+				ResourceManager.this.leaderSessionID = leaderSessionID;
+			}
+		});
+	}
 
-		/**
-		 * Handles error occurring in the leader election service
-		 *
-		 * @param exception Exception being thrown in the leader election service
-		 */
-		@Override
-		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-					// terminate ResourceManager in case of an error
-					shutDown();
-				}
-			});
-		}
+	/**
+	 * Callback method when current resourceManager lose leadership.
+	 */
+	@Override
+	public void revokeLeadership() {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("ResourceManager {} was revoked leadership.", getAddress());
+				jobMasterGateways.clear();
+				ResourceManager.this.leaderSessionID = null;
+			}
+		});
+	}
+
+	/**
+	 * Handles error occurring in the leader election service
+	 *
+	 * @param exception Exception being thrown in the leader election service
+	 */
+	@Override
+	public void handleError(final Exception exception) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+				// notify SlotManager
+				slotManager.handleError(exception);
+				// terminate ResourceManager in case of an error
+				shutDown();
+			}
+		});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index b5782b0..e5c8b64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
 
 	/**
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 695204d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
-	private static final long serialVersionUID = -6990813455942742322L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
deleted file mode 100644
index 5c06648..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
- */
-public abstract class SlotManager {
-
-	private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
-
-	/** Gateway to communicate with ResourceManager */
-	private final ResourceManagerGateway resourceManagerGateway;
-
-	/** All registered slots, including free and allocated slots */
-	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
-
-	/** All pending slot requests, waiting available slots to fulfil */
-	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
-
-	/** All free slots that can be used to be allocated */
-	private final Map<SlotID, ResourceSlot> freeSlots;
-
-	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
-	private final AllocationMap allocationMap;
-
-	public SlotManager(ResourceManagerGateway resourceManagerGateway) {
-		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
-		this.registeredSlots = new HashMap<>(16);
-		this.pendingSlotRequests = new LinkedHashMap<>(16);
-		this.freeSlots = new HashMap<>(16);
-		this.allocationMap = new AllocationMap();
-	}
-
-	// ------------------------------------------------------------------------
-	//  slot managements
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
-	 * allocation if we don't have enough resource. If we have free slot which can match the request, record
-	 * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
-	 * RPC's main thread to avoid race condition).
-	 *
-	 * @param request The detailed request of the slot
-	 */
-	public void requestSlot(final SlotRequest request) {
-		if (isRequestDuplicated(request)) {
-			LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
-			return;
-		}
-
-		// try to fulfil the request with current free slots
-		ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-		if (slot != null) {
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
-				request.getAllocationId(), request.getJobId());
-
-			// record this allocation in bookkeeping
-			allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
-
-			// remove selected slot from free pool
-			freeSlots.remove(slot.getSlotId());
-
-			// TODO: send slot request to TaskManager
-		} else {
-			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
-				"AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId());
-			allocateContainer(request.getResourceProfile());
-			pendingSlotRequests.put(request.getAllocationId(), request);
-		}
-	}
-
-	/**
-	 * Sync slot status with TaskManager's SlotReport.
-	 */
-	public void updateSlotStatus(final SlotReport slotReport) {
-		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-			updateSlotStatus(slotStatus);
-		}
-	}
-
-	/**
-	 * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
-	 * or really rejected by TaskManager. We shall retry this request by:
-	 * <ul>
-	 * <li>1. verify and clear all the previous allocate information for this request
-	 * <li>2. try to request slot again
-	 * </ul>
-	 * <p>
-	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
-	 * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
-	 * but it can be taken care of by rejecting registration at JobManager.
-	 *
-	 * @param originalRequest The original slot request
-	 * @param slotId          The target SlotID
-	 */
-	public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
-		final AllocationID originalAllocationId = originalRequest.getAllocationId();
-		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
-			slotId, originalAllocationId, originalRequest.getJobId());
-
-		// verify the allocation info before we do anything
-		if (freeSlots.containsKey(slotId)) {
-			// this slot is currently empty, no need to de-allocate it from our allocations
-			LOG.info("Original slot is somehow empty, retrying this request");
-
-			// before retry, we should double check whether this request was allocated by some other ways
-			if (!allocationMap.isAllocated(originalAllocationId)) {
-				requestSlot(originalRequest);
-			} else {
-				LOG.info("The failed request has somehow been allocated, SlotID:{}",
-					allocationMap.getSlotID(originalAllocationId));
-			}
-		} else if (allocationMap.isAllocated(slotId)) {
-			final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
-			// check whether we have an agreement on whom this slot belongs to
-			if (originalAllocationId.equals(currentAllocationId)) {
-				LOG.info("De-allocate this request and retry");
-				allocationMap.removeAllocation(currentAllocationId);
-
-				// put this slot back to free pool
-				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
-				freeSlots.put(slotId, slot);
-
-				// retry the request
-				requestSlot(originalRequest);
-			} else {
-				// the slot is taken by someone else, no need to de-allocate it from our allocations
-				LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
-
-				// before retry, we should double check whether this request was allocated by some other ways
-				if (!allocationMap.isAllocated(originalAllocationId)) {
-					requestSlot(originalRequest);
-				} else {
-					LOG.info("The failed request is somehow been allocated, SlotID:{}",
-						allocationMap.getSlotID(originalAllocationId));
-				}
-			}
-		} else {
-			LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
-		}
-	}
-
-	/**
-	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
-	 *
-	 * @param resourceId The ResourceID of the TaskManager
-	 */
-	public void notifyTaskManagerFailure(final ResourceID resourceId) {
-		LOG.info("Resource:{} been notified failure", resourceId);
-		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
-		if (slotIdsToRemove != null) {
-			for (SlotID slotId : slotIdsToRemove.keySet()) {
-				LOG.info("Removing Slot:{} upon resource failure", slotId);
-				if (freeSlots.containsKey(slotId)) {
-					freeSlots.remove(slotId);
-				} else if (allocationMap.isAllocated(slotId)) {
-					allocationMap.removeAllocation(slotId);
-				} else {
-					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  internal behaviors
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
-	 * <ul>
-	 * <li>1. The slot is newly registered.</li>
-	 * <li>2. The slot has registered, it contains its current status.</li>
-	 * </ul>
-	 * <p>
-	 * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
-	 * <p>
-	 * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
-	 * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
-	 * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
-	 * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
-	 * and take next action based on the diff between our information and heartbeat status.
-	 *
-	 * @param reportedStatus Reported slot status
-	 */
-	void updateSlotStatus(final SlotStatus reportedStatus) {
-		final SlotID slotId = reportedStatus.getSlotID();
-		final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler());
-
-		if (registerNewSlot(slot)) {
-			// we have a newly registered slot
-			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-			if (reportedStatus.getAllocationID() != null) {
-				// slot in use, record this in bookkeeping
-				allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-			} else {
-				handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
-			}
-		} else {
-			// slot exists, update current information
-			if (reportedStatus.getAllocationID() != null) {
-				// slot is reported in use
-				final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
-
-				// check whether we also thought this slot is in use
-				if (allocationMap.isAllocated(slotId)) {
-					// we also think that slot is in use, check whether the AllocationID matches
-					final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
-					if (!reportedAllocationId.equals(currentAllocationId)) {
-						LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
-							slotId, currentAllocationId, reportedAllocationId);
-
-						// seems we have a disagreement about the slot assignments, need to correct it
-						allocationMap.removeAllocation(slotId);
-						allocationMap.addAllocation(slotId, reportedAllocationId);
-					}
-				} else {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
-						slotId, reportedAllocationId);
-
-					// we thought the slot is free, should correct this information
-					allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-
-					// remove this slot from free slots pool
-					freeSlots.remove(slotId);
-				}
-			} else {
-				// slot is reported empty
-
-				// check whether we also thought this slot is empty
-				if (allocationMap.isAllocated(slotId)) {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
-						slotId, allocationMap.getAllocationID(slotId));
-
-					// we thought the slot is in use, correct it
-					allocationMap.removeAllocation(slotId);
-
-					// we have a free slot!
-					handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
-				}
-			}
-		}
-	}
-
-	/**
-	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
-	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
-	 * to the free pool.
-	 *
-	 * @param freeSlot The free slot
-	 */
-	private void handleFreeSlot(final ResourceSlot freeSlot) {
-		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
-
-		if (chosenRequest != null) {
-			pendingSlotRequests.remove(chosenRequest.getAllocationId());
-
-			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
-				chosenRequest.getAllocationId(), chosenRequest.getJobId());
-			allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId());
-
-			// TODO: send slot request to TaskManager
-		} else {
-			freeSlots.put(freeSlot.getSlotId(), freeSlot);
-		}
-	}
-
-	/**
-	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
-	 * formerly received slot request, it is either in pending list or already been allocated.
-	 *
-	 * @param request The slot request
-	 * @return <tt>true</tt> if the request is duplicated
-	 */
-	private boolean isRequestDuplicated(final SlotRequest request) {
-		final AllocationID allocationId = request.getAllocationId();
-		return pendingSlotRequests.containsKey(allocationId)
-			|| allocationMap.isAllocated(allocationId);
-	}
-
-	/**
-	 * Try to register slot, and tell if this slot is newly registered.
-	 *
-	 * @param slot The ResourceSlot which will be checked and registered
-	 * @return <tt>true</tt> if we meet a new slot
-	 */
-	private boolean registerNewSlot(final ResourceSlot slot) {
-		final SlotID slotId = slot.getSlotId();
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
-		}
-		return registeredSlots.get(resourceId).put(slotId, slot) == null;
-	}
-
-	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-		final ResourceID resourceId = slotId.getResourceID();
-		if (!registeredSlots.containsKey(resourceId)) {
-			return null;
-		}
-		return registeredSlots.get(resourceId).get(slotId);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Framework specific behavior
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Choose a slot to use among all free slots, the behavior is framework specified.
-	 *
-	 * @param request   The slot request
-	 * @param freeSlots All slots which can be used
-	 * @return The slot we choose to use, <tt>null</tt> if we did not find a match
-	 */
-	protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
-		final Map<SlotID, ResourceSlot> freeSlots);
-
-	/**
-	 * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
-	 *
-	 * @param offeredSlot     The free slot
-	 * @param pendingRequests All the pending slot requests
-	 * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
-	 */
-	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
-		final Map<AllocationID, SlotRequest> pendingRequests);
-
-	/**
-	 * The framework specific code for allocating a container for specified resource profile.
-	 *
-	 * @param resourceProfile The resource profile
-	 */
-	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
-
-
-	// ------------------------------------------------------------------------
-	//  Helper classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
-	 * either by SlotID or AllocationID.
-	 */
-	private static class AllocationMap {
-
-		/** All allocated slots (by SlotID) */
-		private final Map<SlotID, AllocationID> allocatedSlots;
-
-		/** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
-		private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
-
-		AllocationMap() {
-			this.allocatedSlots = new HashMap<>(16);
-			this.allocatedSlotsByAllocationId = new HashMap<>(16);
-		}
-
-		/**
-		 * Add a allocation
-		 *
-		 * @param slotId       The slot id
-		 * @param allocationId The allocation id
-		 */
-		void addAllocation(final SlotID slotId, final AllocationID allocationId) {
-			allocatedSlots.put(slotId, allocationId);
-			allocatedSlotsByAllocationId.put(allocationId, slotId);
-		}
-
-		/**
-		 * De-allocation with slot id
-		 *
-		 * @param slotId The slot id
-		 */
-		void removeAllocation(final SlotID slotId) {
-			if (allocatedSlots.containsKey(slotId)) {
-				final AllocationID allocationId = allocatedSlots.get(slotId);
-				allocatedSlots.remove(slotId);
-				allocatedSlotsByAllocationId.remove(allocationId);
-			}
-		}
-
-		/**
-		 * De-allocation with allocation id
-		 *
-		 * @param allocationId The allocation id
-		 */
-		void removeAllocation(final AllocationID allocationId) {
-			if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
-				SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
-				allocatedSlotsByAllocationId.remove(allocationId);
-				allocatedSlots.remove(slotId);
-			}
-		}
-
-		/**
-		 * Check whether allocation exists by slot id
-		 *
-		 * @param slotId The slot id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final SlotID slotId) {
-			return allocatedSlots.containsKey(slotId);
-		}
-
-		/**
-		 * Check whether allocation exists by allocation id
-		 *
-		 * @param allocationId The allocation id
-		 * @return true if the allocation exists
-		 */
-		boolean isAllocated(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.containsKey(allocationId);
-		}
-
-		AllocationID getAllocationID(final SlotID slotId) {
-			return allocatedSlots.get(slotId);
-		}
-
-		SlotID getSlotID(final AllocationID allocationId) {
-			return allocatedSlotsByAllocationId.get(allocationId);
-		}
-
-		public int size() {
-			return allocatedSlots.size();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Testing utilities
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	boolean isAllocated(final SlotID slotId) {
-		return allocationMap.isAllocated(slotId);
-	}
-
-	@VisibleForTesting
-	boolean isAllocated(final AllocationID allocationId) {
-		return allocationMap.isAllocated(allocationId);
-	}
-
-	/**
-	 * Add free slots directly to the free pool, this will not trigger pending requests allocation
-	 *
-	 * @param slot The resource slot
-	 */
-	@VisibleForTesting
-	void addFreeSlot(final ResourceSlot slot) {
-		final ResourceID resourceId = slot.getResourceID();
-		final SlotID slotId = slot.getSlotId();
-
-		if (!registeredSlots.containsKey(resourceId)) {
-			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
-		}
-		registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-		freeSlots.put(slotId, slot);
-	}
-
-	@VisibleForTesting
-	int getAllocatedSlotCount() {
-		return allocationMap.size();
-	}
-
-	@VisibleForTesting
-	int getFreeSlotCount() {
-		return freeSlots.size();
-	}
-
-	@VisibleForTesting
-	int getPendingRequestCount() {
-		return pendingSlotRequests.size();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
new file mode 100644
index 0000000..6b7f6dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRegistered extends SlotRequestReply {
+
+	public SlotRequestRegistered(AllocationID allocationID) {
+		super(allocationID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
new file mode 100644
index 0000000..cb3ec72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRejected extends SlotRequestReply {
+
+	public SlotRequestRejected(AllocationID allocationID) {
+		super(allocationID);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
new file mode 100644
index 0000000..1b85d0c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class SlotRequestReply implements Serializable {
+
+	private static final long serialVersionUID = 42;
+
+	private final AllocationID allocationID;
+
+	public SlotRequestReply(AllocationID allocationID) {
+		this.allocationID = allocationID;
+	}
+
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
new file mode 100644
index 0000000..ef5ce31
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple SlotManager which ignores resource profiles.
+ */
+public class SimpleSlotManager extends SlotManager {
+
+	@Override
+	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
+		if (slotIterator.hasNext()) {
+			return slotIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
+		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
+		if (requestIterator.hasNext()) {
+			return requestIterator.next();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	protected void allocateContainer(ResourceProfile resourceProfile) {
+		// TODO
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
new file mode 100644
index 0000000..96fde7d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager implements LeaderRetrievalListener {
+
+	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	/** All registered task managers with ResourceID and gateway. */
+	private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+
+	/** All registered slots, including free and allocated slots */
+	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+
+	/** All pending slot requests, waiting available slots to fulfil */
+	private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+	/** All free slots that can be used to be allocated */
+	private final Map<SlotID, ResourceSlot> freeSlots;
+
+	/** All allocations, we can lookup allocations either by SlotID or AllocationID */
+	private final AllocationMap allocationMap;
+
+	private final FiniteDuration timeout;
+
+	/** The current leader id set by the ResourceManager */
+	private UUID leaderID;
+
+	public SlotManager() {
+		this.registeredSlots = new HashMap<>(16);
+		this.pendingSlotRequests = new LinkedHashMap<>(16);
+		this.freeSlots = new HashMap<>(16);
+		this.allocationMap = new AllocationMap();
+		this.taskManagerGateways = new HashMap<>();
+		this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  slot managements
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
+	 * allocation if we don't have enough resource. If we have free slot which can match the request, record
+	 * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
+	 * RPC's main thread to avoid race condition).
+	 *
+	 * @param request The detailed request of the slot
+	 * @return SlotRequestRegistered The confirmation message to be send to the caller
+	 */
+	public SlotRequestRegistered requestSlot(final SlotRequest request) {
+		final AllocationID allocationId = request.getAllocationId();
+		if (isRequestDuplicated(request)) {
+			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
+			return null;
+		}
+
+		// try to fulfil the request with current free slots
+		final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+		if (slot != null) {
+			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
+				allocationId, request.getJobId());
+
+			// record this allocation in bookkeeping
+			allocationMap.addAllocation(slot.getSlotId(), allocationId);
+
+			// remove selected slot from free pool
+			freeSlots.remove(slot.getSlotId());
+
+			final Future<SlotRequestReply> slotRequestReplyFuture =
+				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+			// TODO handle timeouts and response
+		} else {
+			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
+				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
+			allocateContainer(request.getResourceProfile());
+			pendingSlotRequests.put(allocationId, request);
+		}
+
+		return new SlotRequestRegistered(allocationId);
+	}
+
+	/**
+	 * Sync slot status with TaskManager's SlotReport.
+	 */
+	public void updateSlotStatus(final SlotReport slotReport) {
+		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+			updateSlotStatus(slotStatus);
+		}
+	}
+
+	/**
+	 * Registers a TaskExecutor
+	 * @param resourceID TaskExecutor's ResourceID
+	 * @param gateway TaskExcutor's gateway
+	 */
+	public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) {
+		this.taskManagerGateways.put(resourceID, gateway);
+	}
+
+	/**
+	 * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
+	 * or really rejected by TaskManager. We shall retry this request by:
+	 * <ul>
+	 * <li>1. verify and clear all the previous allocate information for this request
+	 * <li>2. try to request slot again
+	 * </ul>
+	 * <p>
+	 * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
+	 * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
+	 * but it can be taken care of by rejecting registration at JobManager.
+	 *
+	 * @param originalRequest The original slot request
+	 * @param slotId          The target SlotID
+	 */
+	public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+		final AllocationID originalAllocationId = originalRequest.getAllocationId();
+		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
+			slotId, originalAllocationId, originalRequest.getJobId());
+
+		// verify the allocation info before we do anything
+		if (freeSlots.containsKey(slotId)) {
+			// this slot is currently empty, no need to de-allocate it from our allocations
+			LOG.info("Original slot is somehow empty, retrying this request");
+
+			// before retry, we should double check whether this request was allocated by some other ways
+			if (!allocationMap.isAllocated(originalAllocationId)) {
+				requestSlot(originalRequest);
+			} else {
+				LOG.info("The failed request has somehow been allocated, SlotID:{}",
+					allocationMap.getSlotID(originalAllocationId));
+			}
+		} else if (allocationMap.isAllocated(slotId)) {
+			final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+			// check whether we have an agreement on whom this slot belongs to
+			if (originalAllocationId.equals(currentAllocationId)) {
+				LOG.info("De-allocate this request and retry");
+				allocationMap.removeAllocation(currentAllocationId);
+
+				// put this slot back to free pool
+				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+				freeSlots.put(slotId, slot);
+
+				// retry the request
+				requestSlot(originalRequest);
+			} else {
+				// the slot is taken by someone else, no need to de-allocate it from our allocations
+				LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+
+				// before retry, we should double check whether this request was allocated by some other ways
+				if (!allocationMap.isAllocated(originalAllocationId)) {
+					requestSlot(originalRequest);
+				} else {
+					LOG.info("The failed request is somehow been allocated, SlotID:{}",
+						allocationMap.getSlotID(originalAllocationId));
+				}
+			}
+		} else {
+			LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+		}
+	}
+
+	/**
+	 * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+	 *
+	 * @param resourceId The ResourceID of the TaskManager
+	 */
+	public void notifyTaskManagerFailure(final ResourceID resourceId) {
+		LOG.info("Resource:{} been notified failure", resourceId);
+		taskManagerGateways.remove(resourceId);
+		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
+		if (slotIdsToRemove != null) {
+			for (SlotID slotId : slotIdsToRemove.keySet()) {
+				LOG.info("Removing Slot: {} upon resource failure", slotId);
+				if (freeSlots.containsKey(slotId)) {
+					freeSlots.remove(slotId);
+				} else if (allocationMap.isAllocated(slotId)) {
+					allocationMap.removeAllocation(slotId);
+				} else {
+					LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal behaviors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
+	 * <ul>
+	 * <li>1. The slot is newly registered.</li>
+	 * <li>2. The slot has registered, it contains its current status.</li>
+	 * </ul>
+	 * <p>
+	 * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
+	 * <p>
+	 * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
+	 * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
+	 * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
+	 * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
+	 * and take next action based on the diff between our information and heartbeat status.
+	 *
+	 * @param reportedStatus Reported slot status
+	 */
+	void updateSlotStatus(final SlotStatus reportedStatus) {
+		final SlotID slotId = reportedStatus.getSlotID();
+
+		final TaskExecutorGateway taskExecutorGateway = taskManagerGateways.get(slotId.getResourceID());
+		if (taskExecutorGateway == null) {
+			LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
+				slotId.getResourceID());
+			return;
+		}
+
+		final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler(), taskExecutorGateway);
+
+		if (registerNewSlot(slot)) {
+			// we have a newly registered slot
+			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+			if (reportedStatus.getAllocationID() != null) {
+				// slot in use, record this in bookkeeping
+				allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+			} else {
+				handleFreeSlot(slot);
+			}
+		} else {
+			// slot exists, update current information
+			if (reportedStatus.getAllocationID() != null) {
+				// slot is reported in use
+				final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
+
+				// check whether we also thought this slot is in use
+				if (allocationMap.isAllocated(slotId)) {
+					// we also think that slot is in use, check whether the AllocationID matches
+					final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+					if (!reportedAllocationId.equals(currentAllocationId)) {
+						LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
+							slotId, currentAllocationId, reportedAllocationId);
+
+						// seems we have a disagreement about the slot assignments, need to correct it
+						allocationMap.removeAllocation(slotId);
+						allocationMap.addAllocation(slotId, reportedAllocationId);
+					}
+				} else {
+					LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
+						slotId, reportedAllocationId);
+
+					// we thought the slot is free, should correct this information
+					allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+
+					// remove this slot from free slots pool
+					freeSlots.remove(slotId);
+				}
+			} else {
+				// slot is reported empty
+
+				// check whether we also thought this slot is empty
+				if (allocationMap.isAllocated(slotId)) {
+					LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
+						slotId, allocationMap.getAllocationID(slotId));
+
+					// we thought the slot is in use, correct it
+					allocationMap.removeAllocation(slotId);
+
+					// we have a free slot!
+					handleFreeSlot(slot);
+				}
+			}
+		}
+	}
+
+	/**
+	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
+	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
+	 * to the free pool.
+	 *
+	 * @param freeSlot The free slot
+	 */
+	private void handleFreeSlot(final ResourceSlot freeSlot) {
+		SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+
+		if (chosenRequest != null) {
+			final AllocationID allocationId = chosenRequest.getAllocationId();
+			pendingSlotRequests.remove(allocationId);
+
+			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
+				allocationId, chosenRequest.getJobId());
+			allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
+
+			final Future<SlotRequestReply> slotRequestReplyFuture =
+				freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+			// TODO handle timeouts and response
+		} else {
+			freeSlots.put(freeSlot.getSlotId(), freeSlot);
+		}
+	}
+
+	/**
+	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
+	 * formerly received slot request, it is either in pending list or already been allocated.
+	 *
+	 * @param request The slot request
+	 * @return <tt>true</tt> if the request is duplicated
+	 */
+	private boolean isRequestDuplicated(final SlotRequest request) {
+		final AllocationID allocationId = request.getAllocationId();
+		return pendingSlotRequests.containsKey(allocationId)
+			|| allocationMap.isAllocated(allocationId);
+	}
+
+	/**
+	 * Try to register slot, and tell if this slot is newly registered.
+	 *
+	 * @param slot The ResourceSlot which will be checked and registered
+	 * @return <tt>true</tt> if we meet a new slot
+	 */
+	private boolean registerNewSlot(final ResourceSlot slot) {
+		final SlotID slotId = slot.getSlotId();
+		final ResourceID resourceId = slotId.getResourceID();
+		if (!registeredSlots.containsKey(resourceId)) {
+			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+		}
+		return registeredSlots.get(resourceId).put(slotId, slot) == null;
+	}
+
+	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+		final ResourceID resourceId = slotId.getResourceID();
+		if (!registeredSlots.containsKey(resourceId)) {
+			return null;
+		}
+		return registeredSlots.get(resourceId).get(slotId);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Framework specific behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Choose a slot to use among all free slots, the behavior is framework specified.
+	 *
+	 * @param request   The slot request
+	 * @param freeSlots All slots which can be used
+	 * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+	 */
+	protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
+		final Map<SlotID, ResourceSlot> freeSlots);
+
+	/**
+	 * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+	 *
+	 * @param offeredSlot     The free slot
+	 * @param pendingRequests All the pending slot requests
+	 * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+	 */
+	protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
+		final Map<AllocationID, SlotRequest> pendingRequests);
+
+	/**
+	 * The framework specific code for allocating a container for specified resource profile.
+	 *
+	 * @param resourceProfile The resource profile
+	 */
+	protected abstract void allocateContainer(final ResourceProfile resourceProfile);
+
+	// ------------------------------------------------------------------------
+	//  Helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
+	 * either by SlotID or AllocationID.
+	 */
+	private static class AllocationMap {
+
+		/** All allocated slots (by SlotID) */
+		private final Map<SlotID, AllocationID> allocatedSlots;
+
+		/** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
+		private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+
+		AllocationMap() {
+			this.allocatedSlots = new HashMap<>(16);
+			this.allocatedSlotsByAllocationId = new HashMap<>(16);
+		}
+
+		/**
+		 * Add a allocation
+		 *
+		 * @param slotId       The slot id
+		 * @param allocationId The allocation id
+		 */
+		void addAllocation(final SlotID slotId, final AllocationID allocationId) {
+			allocatedSlots.put(slotId, allocationId);
+			allocatedSlotsByAllocationId.put(allocationId, slotId);
+		}
+
+		/**
+		 * De-allocation with slot id
+		 *
+		 * @param slotId The slot id
+		 */
+		void removeAllocation(final SlotID slotId) {
+			if (allocatedSlots.containsKey(slotId)) {
+				final AllocationID allocationId = allocatedSlots.get(slotId);
+				allocatedSlots.remove(slotId);
+				allocatedSlotsByAllocationId.remove(allocationId);
+			}
+		}
+
+		/**
+		 * De-allocation with allocation id
+		 *
+		 * @param allocationId The allocation id
+		 */
+		void removeAllocation(final AllocationID allocationId) {
+			if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
+				SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
+				allocatedSlotsByAllocationId.remove(allocationId);
+				allocatedSlots.remove(slotId);
+			}
+		}
+
+		/**
+		 * Check whether allocation exists by slot id
+		 *
+		 * @param slotId The slot id
+		 * @return true if the allocation exists
+		 */
+		boolean isAllocated(final SlotID slotId) {
+			return allocatedSlots.containsKey(slotId);
+		}
+
+		/**
+		 * Check whether allocation exists by allocation id
+		 *
+		 * @param allocationId The allocation id
+		 * @return true if the allocation exists
+		 */
+		boolean isAllocated(final AllocationID allocationId) {
+			return allocatedSlotsByAllocationId.containsKey(allocationId);
+		}
+
+		AllocationID getAllocationID(final SlotID slotId) {
+			return allocatedSlots.get(slotId);
+		}
+
+		SlotID getSlotID(final AllocationID allocationId) {
+			return allocatedSlotsByAllocationId.get(allocationId);
+		}
+
+		public int size() {
+			return allocatedSlots.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  High availability
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		this.leaderID = leaderSessionID;
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Slot Manager received an error from the leader service", exception);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Testing utilities
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	boolean isAllocated(final SlotID slotId) {
+		return allocationMap.isAllocated(slotId);
+	}
+
+	@VisibleForTesting
+	boolean isAllocated(final AllocationID allocationId) {
+		return allocationMap.isAllocated(allocationId);
+	}
+
+	/**
+	 * Add free slots directly to the free pool, this will not trigger pending requests allocation
+	 *
+	 * @param slot The resource slot
+	 */
+	@VisibleForTesting
+	void addFreeSlot(final ResourceSlot slot) {
+		final ResourceID resourceId = slot.getResourceID();
+		final SlotID slotId = slot.getSlotId();
+
+		if (!registeredSlots.containsKey(resourceId)) {
+			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+		}
+		registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+		freeSlots.put(slotId, slot);
+	}
+
+	@VisibleForTesting
+	int getAllocatedSlotCount() {
+		return allocationMap.size();
+	}
+
+	@VisibleForTesting
+	int getFreeSlotCount() {
+		return freeSlots.size();
+	}
+
+	@VisibleForTesting
+	int getPendingRequestCount() {
+		return pendingSlotRequests.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 744b674..0f57bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -50,7 +50,10 @@ public class SlotStatus implements Serializable {
 		this(slotID, profiler, null, null);
 	}
 
-	public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
+	public SlotStatus(
+			SlotID slotID, ResourceProfile profiler,
+			JobID jobID,
+			AllocationID allocationID) {
 		this.slotID = checkNotNull(slotID, "slotID cannot be null");
 		this.profiler = checkNotNull(profiler, "profile cannot be null");
 		this.allocationID = allocationID;

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 6c99706..7257436 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -32,4 +37,16 @@ public interface TaskExecutorGateway extends RpcGateway {
 	// ------------------------------------------------------------------------
 
 	void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId);
+
+	/**
+	 * Send by the ResourceManager to the TaskExecutor
+	 * @param allocationID id for the request
+	 * @param resourceManagerLeaderID current leader id of the ResourceManager
+	 * @return SlotRequestReply Answer to the request
+	 */
+
+	Future<SlotRequestReply> requestSlot(
+		AllocationID allocationID,
+		UUID resourceManagerLeaderID,
+		@RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 5799e62..8183c0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -53,7 +54,8 @@ public class ResourceManagerHATest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+		SlotManager slotManager = mock(SlotManager.class);
+		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
 		Assert.assertNull(resourceManager.getLeaderSessionID());