You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:16 UTC

[36/50] [abbrv] flink git commit: [FLINK-4339] [cluster management] Implement Slot Pool core on JobManager side

[FLINK-4339] [cluster management] Implement Slot Pool core on JobManager side


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe999e03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe999e03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe999e03

Branch: refs/heads/flip-6
Commit: fe999e038ebec1ea6e6fb35150daa53e01a0a391
Parents: 0df6a20
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Oct 13 04:59:46 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:49:24 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/types/SlotID.java  |  16 +-
 .../flink/runtime/instance/SlotDescriptor.java  | 161 +++++
 .../apache/flink/runtime/instance/SlotPool.java | 675 +++++++++++++++++++
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   4 +-
 .../runtime/instance/AllocatedSlotsTest.java    | 135 ++++
 .../runtime/instance/AvailableSlotsTest.java    | 123 ++++
 .../flink/runtime/instance/SlotPoolTest.java    | 297 ++++++++
 7 files changed, 1403 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index e831a5d..237597b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -33,11 +33,11 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 	private final ResourceID resourceId;
 
 	/** The numeric id for single slot */
-	private final int slotId;
+	private final int slotNumber;
 
-	public SlotID(ResourceID resourceId, int slotId) {
+	public SlotID(ResourceID resourceId, int slotNumber) {
 		this.resourceId = checkNotNull(resourceId, "ResourceID must not be null");
-		this.slotId = slotId;
+		this.slotNumber = slotNumber;
 	}
 
 	// ------------------------------------------------------------------------
@@ -47,6 +47,10 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 		return resourceId;
 	}
 
+	public int getSlotNumber() {
+		return slotNumber;
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -60,7 +64,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 
 		SlotID slotID = (SlotID) o;
 
-		if (slotId != slotID.slotId) {
+		if (slotNumber != slotID.slotNumber) {
 			return false;
 		}
 		return resourceId.equals(slotID.resourceId);
@@ -69,13 +73,13 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 	@Override
 	public int hashCode() {
 		int result = resourceId.hashCode();
-		result = 31 * result + slotId;
+		result = 31 * result + slotNumber;
 		return result;
 	}
 
 	@Override
 	public String toString() {
-		return resourceId + "_" + slotId;
+		return resourceId + "_" + slotNumber;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
new file mode 100644
index 0000000..be7cf96
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The description of slots, TaskManagers offer one or more task slots, which define a slice of
+ * their resources. This description will contain some static information about the slot, such
+ * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which
+ * owns the slot.
+ */
+public class SlotDescriptor {
+
+	/** The ID of the job this slice belongs to. */
+	private final JobID jobID;
+
+	/** The location information of the TaskManager to which this slot belongs */
+	private final TaskManagerLocation taskManagerLocation;
+
+	/** The number of the slot on which the task is deployed */
+	private final int slotNumber;
+
+	/** The resource profile of the slot provides */
+	private final ResourceProfile resourceProfile;
+
+	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
+	private final ActorGateway taskManagerActorGateway;
+
+	public SlotDescriptor(
+		final JobID jobID,
+		final TaskManagerLocation location,
+		final int slotNumber,
+		final ResourceProfile resourceProfile,
+		final ActorGateway actorGateway)
+	{
+		this.jobID = checkNotNull(jobID);
+		this.taskManagerLocation = checkNotNull(location);
+		this.slotNumber = slotNumber;
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerActorGateway = checkNotNull(actorGateway);
+	}
+
+	public SlotDescriptor(final SlotDescriptor other) {
+		this.jobID = other.jobID;
+		this.taskManagerLocation = other.taskManagerLocation;
+		this.slotNumber = other.slotNumber;
+		this.resourceProfile = other.resourceProfile;
+		this.taskManagerActorGateway = other.taskManagerActorGateway;
+	}
+	
+	// TODO - temporary workaround until we have the SlotDesriptor in the Slot
+	public SlotDescriptor(final Slot slot) {
+		this.jobID = slot.getJobID();
+		this.taskManagerLocation = slot.getTaskManagerLocation();
+		this.slotNumber = slot.getRootSlotNumber();
+		this.resourceProfile = new ResourceProfile(0, 0);
+		this.taskManagerActorGateway = slot.getTaskManagerActorGateway();
+	}
+
+	/**
+	 * Returns the ID of the job this allocated slot belongs to.
+	 *
+	 * @return the ID of the job this allocated slot belongs to
+	 */
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	/**
+	 * Gets the number of the slot.
+	 *
+	 * @return The number of the slot on the TaskManager.
+	 */
+	public int getSlotNumber() {
+		return slotNumber;
+	}
+
+	/**
+	 * Gets the resource profile of the slot.
+	 *
+	 * @return The resource profile of the slot.
+	 */
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
+	 *
+	 * @return The location info of the TaskManager that offers this slot
+	 */
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 * <p>
+	 * This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The actor gateway that can be used to send messages to the TaskManager.
+	 */
+	public ActorGateway getTaskManagerActorGateway() {
+		return taskManagerActorGateway;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SlotDescriptor that = (SlotDescriptor) o;
+
+		if (slotNumber != that.slotNumber) {
+			return false;
+		}
+		if (!jobID.equals(that.jobID)) {
+			return false;
+		}
+		return taskManagerLocation.equals(that.taskManagerLocation);
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = jobID.hashCode();
+		result = 31 * result + taskManagerLocation.hashCode();
+		result = 31 * result + slotNumber;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return taskManagerLocation + " - " + slotNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
new file mode 100644
index 0000000..e7857c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It will will attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ * <p>
+ * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ */
+public class SlotPool implements SlotOwner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+	private final Object lock = new Object();
+
+	/** The executor which is used to execute futures */
+	private final Executor executor;
+
+	/** All registered resources, slots will be accepted and used only if the resource is registered */
+	private final Set<ResourceID> registeredResources;
+
+	/** The book-keeping of all allocated slots */
+	private final AllocatedSlots allocatedSlots;
+
+	/** The book-keeping of all available slots */
+	private final AvailableSlots availableSlots;
+
+	/** All pending requests waiting for slots */
+	private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+
+	/** Timeout of slot allocation */
+	private final Time timeout;
+
+	/** the leader id of job manager */
+	private UUID jobManagerLeaderId;
+
+	/** The leader id of resource manager */
+	private UUID resourceManagerLeaderId;
+
+	/** The gateway to communicate with resource manager */
+	private ResourceManagerGateway resourceManagerGateway;
+
+	public SlotPool(final Executor executor) {
+		this.executor = executor;
+		this.registeredResources = new HashSet<>();
+		this.allocatedSlots = new AllocatedSlots();
+		this.availableSlots = new AvailableSlots();
+		this.pendingRequests = new HashMap<>();
+		this.timeout = Time.of(5, TimeUnit.SECONDS);
+	}
+
+	public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
+		this.jobManagerLeaderId = jobManagerLeaderId;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot Allocation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Try to allocate a simple slot with specified resource profile.
+	 *
+	 * @param jobID           The job id which the slot allocated for
+	 * @param resourceProfile The needed resource profile
+	 * @return The future of allocated simple slot
+	 */
+	public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) {
+		return allocateSimpleSlot(jobID, resourceProfile, new AllocationID());
+	}
+
+
+	/**
+	 * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly
+	 * for testing purpose since we need to specify whatever allocation id we want.
+	 */
+	@VisibleForTesting
+	Future<SimpleSlot> allocateSimpleSlot(
+		final JobID jobID,
+		final ResourceProfile resourceProfile,
+		final AllocationID allocationID)
+	{
+		final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
+
+		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
+
+		final SlotOwner owner = this;
+		return future.thenApplyAsync(
+			new ApplyFunction<SlotDescriptor, SimpleSlot>() {
+				@Override
+				public SimpleSlot apply(SlotDescriptor descriptor) {
+					SimpleSlot slot = new SimpleSlot(
+							descriptor.getJobID(), SlotPool.this,
+							descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
+							descriptor.getTaskManagerActorGateway());
+					synchronized (lock) {
+						// double validation since we are out of the lock protection after the slot is granted
+						if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
+							LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID);
+							allocatedSlots.add(allocationID, descriptor, slot);
+						}
+						else {
+							throw new RuntimeException("Resource was marked dead asynchronously.");
+						}
+					}
+					return slot;
+				}
+			},
+			executor
+		);
+	}
+
+
+	/**
+	 * Try to allocate a shared slot with specified resource profile.
+	 *
+	 * @param jobID                  The job id which the slot allocated for
+	 * @param resourceProfile        The needed resource profile
+	 * @param sharingGroupAssignment The slot sharing group of the vertex
+	 * @return The future of allocated shared slot
+	 */
+	public Future<SharedSlot> allocateSharedSlot(
+		final JobID jobID,
+		final ResourceProfile resourceProfile,
+		final SlotSharingGroupAssignment sharingGroupAssignment)
+	{
+		return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID());
+	}
+
+	/**
+	 * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly
+	 * for testing purpose since we need to specify whatever allocation id we want.
+	 */
+	@VisibleForTesting
+	Future<SharedSlot> allocateSharedSlot(
+		final JobID jobID,
+		final ResourceProfile resourceProfile,
+		final SlotSharingGroupAssignment sharingGroupAssignment,
+		final AllocationID allocationID)
+	{
+		final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
+
+		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
+
+		return future.thenApplyAsync(
+			new ApplyFunction<SlotDescriptor, SharedSlot>() {
+				@Override
+				public SharedSlot apply(SlotDescriptor descriptor) {
+					SharedSlot slot = new SharedSlot(
+							descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(),
+							descriptor.getSlotNumber(), descriptor.getTaskManagerActorGateway(),
+							sharingGroupAssignment);
+
+					synchronized (lock) {
+						// double validation since we are out of the lock protection after the slot is granted
+						if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
+							LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID);
+							allocatedSlots.add(allocationID, descriptor, slot);
+						}
+						else {
+							throw new RuntimeException("Resource was marked dead asynchronously.");
+						}
+					}
+					return slot;
+				}
+			},
+			executor
+		);
+	}
+
+	/**
+	 * Internally allocate the slot with specified resource profile. We will first check whether we have some
+	 * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to
+	 * allocation the slot from resource manager.
+	 */
+	private void internalAllocateSlot(
+		final JobID jobID,
+		final AllocationID allocationID,
+		final ResourceProfile resourceProfile,
+		final FlinkCompletableFuture<SlotDescriptor> future)
+	{
+		LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID);
+
+		synchronized (lock) {
+			// check whether we have any free slot which can match the required resource profile
+			SlotDescriptor freeSlot = availableSlots.poll(resourceProfile);
+			if (freeSlot != null) {
+				future.complete(freeSlot);
+			}
+			else {
+				if (resourceManagerGateway != null) {
+					LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.",
+						allocationID);
+					SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+					pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future));
+					resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout)
+						.handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() {
+							@Override
+							public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
+								if (throwable != null) {
+									future.completeExceptionally(
+										new Exception("Slot allocation from resource manager failed", throwable));
+								} else if (slotRequestReply instanceof RMSlotRequestRejected) {
+									future.completeExceptionally(
+										new Exception("Slot allocation rejected by resource manager"));
+								}
+								return null;
+							}
+						}, executor);
+				}
+				else {
+					LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID);
+					future.completeExceptionally(new Exception("Resource manager not available right now."));
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot De-allocation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
+	 * slot can be reused by other pending requests if the resource profile matches.n
+	 *
+	 * @param slot The slot needs to be returned
+	 * @return True if the returning slot been accepted
+	 */
+	@Override
+	public boolean returnAllocatedSlot(Slot slot) {
+		checkNotNull(slot);
+		checkArgument(!slot.isAlive(), "slot is still alive");
+		checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool.");
+
+		if (slot.markReleased()) {
+			synchronized (lock) {
+				final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot);
+				if (slotDescriptor != null) {
+					// check if this TaskManager is valid
+					if (!registeredResources.contains(slot.getTaskManagerID())) {
+						return false;
+					}
+
+					final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor);
+					if (pendingRequest != null) {
+						pendingRequest.complete(slotDescriptor);
+					}
+					else {
+						availableSlots.add(slotDescriptor);
+					}
+
+					return true;
+				}
+				else {
+					throw new IllegalArgumentException("Slot was not allocated from this pool.");
+				}
+			}
+		}
+		else {
+			return false;
+		}
+	}
+
+	private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) {
+		for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
+			final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
+			if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) {
+				pendingRequests.remove(entry.getKey());
+				return pendingRequest.f1;
+			}
+		}
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot Releasing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Release slot to TaskManager, called for finished tasks or canceled jobs.
+	 *
+	 * @param slot The slot needs to be released.
+	 */
+	public void releaseSlot(final Slot slot) {
+		synchronized (lock) {
+			allocatedSlots.remove(slot);
+			availableSlots.remove(new SlotDescriptor(slot));
+			// TODO: send release request to task manager
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot Offering
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
+	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
+	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
+	 * request waiting for this slot (maybe fulfilled by some other returned slot).
+	 *
+	 * @param allocationID   The allocation id of the lo
+	 * @param slotDescriptor The offered slot descriptor
+	 * @return True if we accept the offering
+	 */
+	public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) {
+		synchronized (lock) {
+			// check if this TaskManager is valid
+			final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
+			if (!registeredResources.contains(resourceID)) {
+				LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}",
+					allocationID, slotDescriptor);
+				return false;
+			}
+
+			// check whether we have already using this slot
+			final Slot allocatedSlot = allocatedSlots.get(allocationID);
+			if (allocatedSlot != null) {
+				final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot);
+
+				if (allocatedSlotDescriptor.equals(slotDescriptor)) {
+					LOG.debug("Allocation[{}] Duplicated slot offering: {}",
+						allocationID, slotDescriptor);
+					return true;
+				}
+				else {
+					LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}",
+						allocationID, allocatedSlotDescriptor, slotDescriptor);
+					return false;
+				}
+			}
+
+			// check whether we already have this slot in free pool
+			if (availableSlots.contains(slotDescriptor)) {
+				LOG.debug("Allocation[{}] Duplicated slot offering: {}",
+					allocationID, slotDescriptor);
+				return true;
+			}
+
+			// check whether we have request waiting for this slot
+			if (pendingRequests.containsKey(allocationID)) {
+				FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1;
+				future.complete(slotDescriptor);
+				return true;
+			}
+
+			// unwanted slot, rejecting this offer
+			return false;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Resource
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
+	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
+	 *
+	 * @param resourceID The id of the TaskManager
+	 */
+	public void registerResource(final ResourceID resourceID) {
+		synchronized (lock) {
+			registeredResources.add(resourceID);
+		}
+	}
+
+	/**
+	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
+	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
+	 *
+	 * @param resourceID The id of the TaskManager
+	 */
+	public void releaseResource(final ResourceID resourceID) {
+		synchronized (lock) {
+			registeredResources.remove(resourceID);
+			availableSlots.removeByResource(resourceID);
+
+			final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID);
+			for (Slot slot : allocatedSlotsForResource) {
+				slot.releaseSlot();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  ResourceManager
+	// ------------------------------------------------------------------------
+
+	public void setResourceManager(
+		final UUID resourceManagerLeaderId,
+		final ResourceManagerGateway resourceManagerGateway)
+	{
+		synchronized (lock) {
+			this.resourceManagerLeaderId = resourceManagerLeaderId;
+			this.resourceManagerGateway = resourceManagerGateway;
+		}
+	}
+
+	public void disconnectResourceManager() {
+		synchronized (lock) {
+			this.resourceManagerLeaderId = null;
+			this.resourceManagerGateway = null;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Helper classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Organize allocated slots from different points of view.
+	 */
+	static class AllocatedSlots {
+
+		/** All allocated slots organized by TaskManager */
+		private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
+
+		/** All allocated slots organized by Slot object */
+		private final Map<Slot, AllocationID> allocatedSlots;
+
+		private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
+
+		/** All allocated slots organized by AllocationID */
+		private final Map<AllocationID, Slot> allocatedSlotsById;
+
+		AllocatedSlots() {
+			this.allocatedSlotsByResource = new HashMap<>();
+			this.allocatedSlots = new HashMap<>();
+			this.allocatedSlotsWithDescriptor = new HashMap<>();
+			this.allocatedSlotsById = new HashMap<>();
+		}
+
+		/**
+		 * Add a new allocation
+		 *
+		 * @param allocationID The allocation id
+		 * @param slot         The allocated slot
+		 */
+		void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) {
+			allocatedSlots.put(slot, allocationID);
+			allocatedSlotsById.put(allocationID, slot);
+			allocatedSlotsWithDescriptor.put(slot, descriptor);
+
+			final ResourceID resourceID = slot.getTaskManagerID();
+			Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+			if (slotsForResource == null) {
+				slotsForResource = new HashSet<>();
+				allocatedSlotsByResource.put(resourceID, slotsForResource);
+			}
+			slotsForResource.add(slot);
+		}
+
+		/**
+		 * Get allocated slot with allocation id
+		 *
+		 * @param allocationID The allocation id
+		 * @return The allocated slot, null if we can't find a match
+		 */
+		Slot get(final AllocationID allocationID) {
+			return allocatedSlotsById.get(allocationID);
+		}
+
+		/**
+		 * Check whether we have allocated this slot
+		 *
+		 * @param slot The slot needs to checked
+		 * @return True if we contains this slot
+		 */
+		boolean contains(final Slot slot) {
+			return allocatedSlots.containsKey(slot);
+		}
+
+		/**
+		 * Remove an allocation with slot.
+		 *
+		 * @param slot The slot needs to be removed
+		 */
+		SlotDescriptor remove(final Slot slot) {
+			final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot);
+			if (descriptor != null) {
+				final AllocationID allocationID = allocatedSlots.remove(slot);
+				if (allocationID != null) {
+					allocatedSlotsById.remove(allocationID);
+				} else {
+					throw new IllegalStateException("Bug: maps are inconsistent");
+				}
+
+				final ResourceID resourceID = slot.getTaskManagerID();
+				final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+				slotsForResource.remove(slot);
+				if (slotsForResource.isEmpty()) {
+					allocatedSlotsByResource.remove(resourceID);
+				}
+				
+				return descriptor;
+			} else {
+				return null;
+			}
+		}
+
+		/**
+		 * Get all allocated slot from same TaskManager.
+		 *
+		 * @param resourceID The id of the TaskManager
+		 * @return Set of slots which are allocated from the same TaskManager
+		 */
+		Set<Slot> getSlotsByResource(final ResourceID resourceID) {
+			Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+			if (slotsForResource != null) {
+				return new HashSet<>(slotsForResource);
+			}
+			else {
+				return new HashSet<>();
+			}
+		}
+
+		@VisibleForTesting
+		boolean containResource(final ResourceID resourceID) {
+			return allocatedSlotsByResource.containsKey(resourceID);
+		}
+
+		@VisibleForTesting
+		int size() {
+			return allocatedSlots.size();
+		}
+	}
+
+	/**
+	 * Organize all available slots from different points of view.
+	 */
+	static class AvailableSlots {
+
+		/** All available slots organized by TaskManager */
+		private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource;
+
+		/** All available slots */
+		private final Set<SlotDescriptor> availableSlots;
+
+		AvailableSlots() {
+			this.availableSlotsByResource = new HashMap<>();
+			this.availableSlots = new HashSet<>();
+		}
+
+		/**
+		 * Add an available slot.
+		 *
+		 * @param descriptor The descriptor of the slot
+		 */
+		void add(final SlotDescriptor descriptor) {
+			availableSlots.add(descriptor);
+
+			final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID();
+			Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID);
+			if (slotsForResource == null) {
+				slotsForResource = new HashSet<>();
+				availableSlotsByResource.put(resourceID, slotsForResource);
+			}
+			slotsForResource.add(descriptor);
+		}
+
+		/**
+		 * Check whether we have this slot
+		 *
+		 * @param slotDescriptor The descriptor of the slot
+		 * @return True if we contains this slot
+		 */
+		boolean contains(final SlotDescriptor slotDescriptor) {
+			return availableSlots.contains(slotDescriptor);
+		}
+
+		/**
+		 * Poll a slot which matches the required resource profile
+		 *
+		 * @param resourceProfile The required resource profile
+		 * @return Slot which matches the resource profile, null if we can't find a match
+		 */
+		SlotDescriptor poll(final ResourceProfile resourceProfile) {
+			for (SlotDescriptor slotDescriptor : availableSlots) {
+				if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
+					remove(slotDescriptor);
+					return slotDescriptor;
+				}
+			}
+			return null;
+		}
+
+		/**
+		 * Remove all available slots come from specified TaskManager.
+		 *
+		 * @param resourceID The id of the TaskManager
+		 */
+		void removeByResource(final ResourceID resourceID) {
+			final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID);
+			if (slotsForResource != null) {
+				for (SlotDescriptor slotDescriptor : slotsForResource) {
+					availableSlots.remove(slotDescriptor);
+				}
+			}
+		}
+
+		private void remove(final SlotDescriptor slotDescriptor) {
+			availableSlots.remove(slotDescriptor);
+
+			final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
+			final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID));
+			slotsForResource.remove(slotDescriptor);
+			if (slotsForResource.isEmpty()) {
+				availableSlotsByResource.remove(resourceID);
+			}
+		}
+
+		@VisibleForTesting
+		boolean containResource(final ResourceID resourceID) {
+			return availableSlotsByResource.containsKey(resourceID);
+		}
+
+		@VisibleForTesting
+		int size() {
+			return availableSlots.size();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5da7827..9463bfe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -652,8 +652,8 @@ object AkkaUtils {
   }
 
   def formatDurationParingErrorMessage: String = {
-    "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " + 
-      "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|"+
+    "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " +
+      "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
       "(�s|micro|microsecond)|(ns|nano|nanosecond)"
   }
   

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
new file mode 100644
index 0000000..655a3ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AllocatedSlotsTest {
+
+	@Test
+	public void testOperations() throws Exception {
+		SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+
+		final AllocationID allocation1 = new AllocationID();
+		final ResourceID resource1 = new ResourceID("resource1");
+		final Slot slot1 = createSlot(resource1);
+
+		allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1);
+
+		assertTrue(allocatedSlots.contains(slot1));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(1, allocatedSlots.size());
+
+		final AllocationID allocation2 = new AllocationID();
+		final Slot slot2 = createSlot(resource1);
+
+		allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2);
+
+		assertTrue(allocatedSlots.contains(slot1));
+		assertTrue(allocatedSlots.contains(slot2));
+		assertTrue(allocatedSlots.containResource(resource1));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(2, allocatedSlots.size());
+
+		final AllocationID allocation3 = new AllocationID();
+		final ResourceID resource2 = new ResourceID("resource2");
+		final Slot slot3 = createSlot(resource2);
+
+		allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3);
+
+		assertTrue(allocatedSlots.contains(slot1));
+		assertTrue(allocatedSlots.contains(slot2));
+		assertTrue(allocatedSlots.contains(slot3));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertEquals(slot2, allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+		assertEquals(3, allocatedSlots.size());
+
+		allocatedSlots.remove(slot2);
+
+		assertTrue(allocatedSlots.contains(slot1));
+		assertFalse(allocatedSlots.contains(slot2));
+		assertTrue(allocatedSlots.contains(slot3));
+		assertTrue(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertEquals(slot1, allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+		assertEquals(2, allocatedSlots.size());
+
+		allocatedSlots.remove(slot1);
+
+		assertFalse(allocatedSlots.contains(slot1));
+		assertFalse(allocatedSlots.contains(slot2));
+		assertTrue(allocatedSlots.contains(slot3));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertTrue(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertEquals(slot3, allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+		assertEquals(1, allocatedSlots.size());
+
+		allocatedSlots.remove(slot3);
+
+		assertFalse(allocatedSlots.contains(slot1));
+		assertFalse(allocatedSlots.contains(slot2));
+		assertFalse(allocatedSlots.contains(slot3));
+		assertFalse(allocatedSlots.containResource(resource1));
+		assertFalse(allocatedSlots.containResource(resource2));
+
+		assertNull(allocatedSlots.get(allocation1));
+		assertNull(allocatedSlots.get(allocation2));
+		assertNull(allocatedSlots.get(allocation3));
+		assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+		assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size());
+		assertEquals(0, allocatedSlots.size());
+	}
+
+	private Slot createSlot(final ResourceID resourceId) {
+		Slot slot = mock(Slot.class);
+		when(slot.getTaskManagerID()).thenReturn(resourceId);
+		return slot;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
new file mode 100644
index 0000000..872810f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest {
+
+	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+
+	static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+
+	@Test
+	public void testAddAndRemove() throws Exception {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final ResourceID resource2 = new ResourceID("resource2");
+
+		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+		final SlotDescriptor slot2 = createSlotDescriptor(resource1);
+		final SlotDescriptor slot3 = createSlotDescriptor(resource2);
+
+		availableSlots.add(slot1);
+		availableSlots.add(slot2);
+		availableSlots.add(slot3);
+
+		assertEquals(3, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1));
+		assertTrue(availableSlots.contains(slot2));
+		assertTrue(availableSlots.contains(slot3));
+		assertTrue(availableSlots.containResource(resource1));
+		assertTrue(availableSlots.containResource(resource2));
+
+		availableSlots.removeByResource(resource1);
+
+		assertEquals(1, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1));
+		assertFalse(availableSlots.contains(slot2));
+		assertTrue(availableSlots.contains(slot3));
+		assertFalse(availableSlots.containResource(resource1));
+		assertTrue(availableSlots.containResource(resource2));
+
+		availableSlots.removeByResource(resource2);
+
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1));
+		assertFalse(availableSlots.contains(slot2));
+		assertFalse(availableSlots.contains(slot3));
+		assertFalse(availableSlots.containResource(resource1));
+		assertFalse(availableSlots.containResource(resource2));
+	}
+
+	@Test
+	public void testPollFreeSlot() {
+		SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+		final ResourceID resource1 = new ResourceID("resource1");
+		final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+
+		availableSlots.add(slot1);
+
+		assertEquals(1, availableSlots.size());
+		assertTrue(availableSlots.contains(slot1));
+		assertTrue(availableSlots.containResource(resource1));
+
+		assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
+
+		assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE));
+		assertEquals(0, availableSlots.size());
+		assertFalse(availableSlots.contains(slot1));
+		assertFalse(availableSlots.containResource(resource1));
+	}
+
+	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) {
+		return createSlotDescriptor(resourceID, new JobID());
+	}
+
+	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) {
+		return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+	}
+
+	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+		final ResourceProfile resourceProfile)
+	{
+		return createSlotDescriptor(resourceID, jobID, resourceProfile, 0);
+	}
+
+	static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+		final ResourceProfile resourceProfile, final int slotNumber)
+	{
+		TaskManagerLocation location = mock(TaskManagerLocation.class);
+		when(location.getResourceID()).thenReturn(resourceID);
+		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
new file mode 100644
index 0000000..30cdbd6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotPoolTest extends TestLogger {
+
+	private Executor executor;
+
+	private SlotPool slotPool;
+
+	private ResourceManagerGateway resourceManagerGateway;
+
+	@Before
+	public void setUp() throws Exception {
+		this.executor = Executors.newFixedThreadPool(1);
+		this.slotPool = new SlotPool(executor);
+		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
+		when(resourceManagerGateway
+			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
+			.thenReturn(mock(Future.class));
+		slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway);
+		slotPool.setJobManagerLeaderId(UUID.randomUUID());
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testAllocateSimpleSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobID jobID = new JobID();
+		AllocationID allocationID = new AllocationID();
+		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+		assertFalse(future.isDone());
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+		assertEquals(resourceID, slot.getTaskManagerID());
+		assertEquals(jobID, slot.getJobID());
+		assertEquals(slotPool, slot.getOwner());
+	}
+
+	@Test
+	public void testAllocateSharedSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobVertexID vid = new JobVertexID();
+		SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
+		SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
+
+		JobID jobID = new JobID();
+		AllocationID allocationID = new AllocationID();
+		Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID);
+
+		assertFalse(future.isDone());
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+		SharedSlot slot = future.get(1, TimeUnit.SECONDS);
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+		assertEquals(resourceID, slot.getTaskManagerID());
+		assertEquals(jobID, slot.getJobID());
+		assertEquals(slotPool, slot.getOwner());
+
+		SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
+		assertNotNull(simpleSlot);
+		assertTrue(simpleSlot.isAlive());
+	}
+
+	@Test
+	public void testAllocateSlotWithoutResourceManager() throws Exception {
+		slotPool.disconnectResourceManager();
+		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE);
+		future.handleAsync(
+			new BiFunction<SimpleSlot, Throwable, Void>() {
+				@Override
+				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+					assertNull(simpleSlot);
+					assertNotNull(throwable);
+					return null;
+				}
+			},
+			executor);
+		try {
+			future.get(1, TimeUnit.SECONDS);
+			fail("We expected a ExecutionException.");
+		} catch (ExecutionException ex) {
+			// we expect the exception
+		}
+	}
+
+	@Test
+	public void testAllocationFulfilledByReturnedSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobID jobID = new JobID();
+
+		AllocationID allocationID1 = new AllocationID();
+		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+
+		AllocationID allocationID2 = new AllocationID();
+		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+		assertFalse(future1.isDone());
+		assertFalse(future2.isDone());
+		verify(resourceManagerGateway, times(2))
+			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+		assertFalse(future2.isDone());
+
+		// return this slot to pool
+		slot1.releaseSlot();
+
+		// second allocation fulfilled by previous slot returning
+		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+		assertTrue(future2.isDone());
+
+		assertNotEquals(slot1, slot2);
+		assertTrue(slot1.isReleased());
+		assertTrue(slot2.isAlive());
+		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+	}
+
+	@Test
+	public void testAllocateWithFreeSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobID jobID = new JobID();
+		AllocationID allocationID1 = new AllocationID();
+		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+		assertFalse(future1.isDone());
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+
+		// return this slot to pool
+		slot1.releaseSlot();
+
+		AllocationID allocationID2 = new AllocationID();
+		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+		// second allocation fulfilled by previous slot returning
+		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+		assertTrue(future2.isDone());
+
+		assertNotEquals(slot1, slot2);
+		assertTrue(slot1.isReleased());
+		assertTrue(slot2.isAlive());
+		assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+		assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+	}
+
+	@Test
+	public void testOfferSlot() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobID jobID = new JobID();
+		AllocationID allocationID = new AllocationID();
+		Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+		assertFalse(future.isDone());
+		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+		// slot from unregistered resource
+		SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
+		assertFalse(slotPool.offerSlot(allocationID, invalid));
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+
+		// reject offering with mismatch allocation id
+		assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor));
+
+		// accepted slot
+		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+		SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+
+		// conflict offer with using slot
+		SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertFalse(slotPool.offerSlot(allocationID, conflict));
+
+		// duplicated offer with using slot
+		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+		assertTrue(future.isDone());
+		assertTrue(slot.isAlive());
+
+		// duplicated offer with free slot
+		slot.releaseSlot();
+		assertTrue(slot.isReleased());
+		assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+	}
+
+	@Test
+	public void testReleaseResource() throws Exception {
+		ResourceID resourceID = new ResourceID("resource");
+		slotPool.registerResource(resourceID);
+
+		JobID jobID = new JobID();
+
+		AllocationID allocationID1 = new AllocationID();
+		Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+
+		AllocationID allocationID2 = new AllocationID();
+		Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+		SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+		assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+		SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+		assertTrue(future1.isDone());
+		assertFalse(future2.isDone());
+
+		slotPool.releaseResource(resourceID);
+		assertTrue(slot1.isReleased());
+
+		// slot released and not usable, second allocation still not fulfilled
+		Thread.sleep(10);
+		assertFalse(future2.isDone());
+	}
+
+}