You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:16 UTC
[36/50] [abbrv] flink git commit: [FLINK-4339] [cluster management]
Implement Slot Pool core on JobManager side
[FLINK-4339] [cluster management] Implement Slot Pool core on JobManager side
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe999e03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe999e03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe999e03
Branch: refs/heads/flip-6
Commit: fe999e038ebec1ea6e6fb35150daa53e01a0a391
Parents: 0df6a20
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Oct 13 04:59:46 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 20 19:49:24 2016 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/types/SlotID.java | 16 +-
.../flink/runtime/instance/SlotDescriptor.java | 161 +++++
.../apache/flink/runtime/instance/SlotPool.java | 675 +++++++++++++++++++
.../apache/flink/runtime/akka/AkkaUtils.scala | 4 +-
.../runtime/instance/AllocatedSlotsTest.java | 135 ++++
.../runtime/instance/AvailableSlotsTest.java | 123 ++++
.../flink/runtime/instance/SlotPoolTest.java | 297 ++++++++
7 files changed, 1403 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index e831a5d..237597b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -33,11 +33,11 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
private final ResourceID resourceId;
/** The numeric id for single slot */
- private final int slotId;
+ private final int slotNumber;
- public SlotID(ResourceID resourceId, int slotId) {
+ public SlotID(ResourceID resourceId, int slotNumber) {
this.resourceId = checkNotNull(resourceId, "ResourceID must not be null");
- this.slotId = slotId;
+ this.slotNumber = slotNumber;
}
// ------------------------------------------------------------------------
@@ -47,6 +47,10 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
return resourceId;
}
+ public int getSlotNumber() {
+ return slotNumber;
+ }
+
// ------------------------------------------------------------------------
@Override
@@ -60,7 +64,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
SlotID slotID = (SlotID) o;
- if (slotId != slotID.slotId) {
+ if (slotNumber != slotID.slotNumber) {
return false;
}
return resourceId.equals(slotID.resourceId);
@@ -69,13 +73,13 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
@Override
public int hashCode() {
int result = resourceId.hashCode();
- result = 31 * result + slotId;
+ result = 31 * result + slotNumber;
return result;
}
@Override
public String toString() {
- return resourceId + "_" + slotId;
+ return resourceId + "_" + slotNumber;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
new file mode 100644
index 0000000..be7cf96
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The description of slots, TaskManagers offer one or more task slots, which define a slice of
+ * their resources. This description will contain some static information about the slot, such
+ * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which
+ * owns the slot.
+ */
+public class SlotDescriptor {
+
+ /** The ID of the job this slice belongs to. */
+ private final JobID jobID;
+
+ /** The location information of the TaskManager to which this slot belongs */
+ private final TaskManagerLocation taskManagerLocation;
+
+ /** The number of the slot on which the task is deployed */
+ private final int slotNumber;
+
+ /** The resource profile of the slot provides */
+ private final ResourceProfile resourceProfile;
+
+ /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
+ private final ActorGateway taskManagerActorGateway;
+
+ public SlotDescriptor(
+ final JobID jobID,
+ final TaskManagerLocation location,
+ final int slotNumber,
+ final ResourceProfile resourceProfile,
+ final ActorGateway actorGateway)
+ {
+ this.jobID = checkNotNull(jobID);
+ this.taskManagerLocation = checkNotNull(location);
+ this.slotNumber = slotNumber;
+ this.resourceProfile = checkNotNull(resourceProfile);
+ this.taskManagerActorGateway = checkNotNull(actorGateway);
+ }
+
+ public SlotDescriptor(final SlotDescriptor other) {
+ this.jobID = other.jobID;
+ this.taskManagerLocation = other.taskManagerLocation;
+ this.slotNumber = other.slotNumber;
+ this.resourceProfile = other.resourceProfile;
+ this.taskManagerActorGateway = other.taskManagerActorGateway;
+ }
+
+ // TODO - temporary workaround until we have the SlotDesriptor in the Slot
+ public SlotDescriptor(final Slot slot) {
+ this.jobID = slot.getJobID();
+ this.taskManagerLocation = slot.getTaskManagerLocation();
+ this.slotNumber = slot.getRootSlotNumber();
+ this.resourceProfile = new ResourceProfile(0, 0);
+ this.taskManagerActorGateway = slot.getTaskManagerActorGateway();
+ }
+
+ /**
+ * Returns the ID of the job this allocated slot belongs to.
+ *
+ * @return the ID of the job this allocated slot belongs to
+ */
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ /**
+ * Gets the number of the slot.
+ *
+ * @return The number of the slot on the TaskManager.
+ */
+ public int getSlotNumber() {
+ return slotNumber;
+ }
+
+ /**
+ * Gets the resource profile of the slot.
+ *
+ * @return The resource profile of the slot.
+ */
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ /**
+ * Gets the location info of the TaskManager that offers this slot.
+ *
+ * @return The location info of the TaskManager that offers this slot
+ */
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+
+ /**
+ * Gets the actor gateway that can be used to send messages to the TaskManager.
+ * <p>
+ * This method should be removed once the new interface-based RPC abstraction is in place
+ *
+ * @return The actor gateway that can be used to send messages to the TaskManager.
+ */
+ public ActorGateway getTaskManagerActorGateway() {
+ return taskManagerActorGateway;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlotDescriptor that = (SlotDescriptor) o;
+
+ if (slotNumber != that.slotNumber) {
+ return false;
+ }
+ if (!jobID.equals(that.jobID)) {
+ return false;
+ }
+ return taskManagerLocation.equals(that.taskManagerLocation);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = jobID.hashCode();
+ result = 31 * result + taskManagerLocation.hashCode();
+ result = 31 * result + slotNumber;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return taskManagerLocation + " - " + slotNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
new file mode 100644
index 0000000..e7857c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It will will attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ * <p>
+ * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ */
+public class SlotPool implements SlotOwner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+ private final Object lock = new Object();
+
+ /** The executor which is used to execute futures */
+ private final Executor executor;
+
+ /** All registered resources, slots will be accepted and used only if the resource is registered */
+ private final Set<ResourceID> registeredResources;
+
+ /** The book-keeping of all allocated slots */
+ private final AllocatedSlots allocatedSlots;
+
+ /** The book-keeping of all available slots */
+ private final AvailableSlots availableSlots;
+
+ /** All pending requests waiting for slots */
+ private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+
+ /** Timeout of slot allocation */
+ private final Time timeout;
+
+ /** the leader id of job manager */
+ private UUID jobManagerLeaderId;
+
+ /** The leader id of resource manager */
+ private UUID resourceManagerLeaderId;
+
+ /** The gateway to communicate with resource manager */
+ private ResourceManagerGateway resourceManagerGateway;
+
+ public SlotPool(final Executor executor) {
+ this.executor = executor;
+ this.registeredResources = new HashSet<>();
+ this.allocatedSlots = new AllocatedSlots();
+ this.availableSlots = new AvailableSlots();
+ this.pendingRequests = new HashMap<>();
+ this.timeout = Time.of(5, TimeUnit.SECONDS);
+ }
+
+ public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
+ this.jobManagerLeaderId = jobManagerLeaderId;
+ }
+
+ // ------------------------------------------------------------------------
+ // Slot Allocation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Try to allocate a simple slot with specified resource profile.
+ *
+ * @param jobID The job id which the slot allocated for
+ * @param resourceProfile The needed resource profile
+ * @return The future of allocated simple slot
+ */
+ public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) {
+ return allocateSimpleSlot(jobID, resourceProfile, new AllocationID());
+ }
+
+
+ /**
+ * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly
+ * for testing purpose since we need to specify whatever allocation id we want.
+ */
+ @VisibleForTesting
+ Future<SimpleSlot> allocateSimpleSlot(
+ final JobID jobID,
+ final ResourceProfile resourceProfile,
+ final AllocationID allocationID)
+ {
+ final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
+
+ internalAllocateSlot(jobID, allocationID, resourceProfile, future);
+
+ final SlotOwner owner = this;
+ return future.thenApplyAsync(
+ new ApplyFunction<SlotDescriptor, SimpleSlot>() {
+ @Override
+ public SimpleSlot apply(SlotDescriptor descriptor) {
+ SimpleSlot slot = new SimpleSlot(
+ descriptor.getJobID(), SlotPool.this,
+ descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
+ descriptor.getTaskManagerActorGateway());
+ synchronized (lock) {
+ // double validation since we are out of the lock protection after the slot is granted
+ if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
+ LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID);
+ allocatedSlots.add(allocationID, descriptor, slot);
+ }
+ else {
+ throw new RuntimeException("Resource was marked dead asynchronously.");
+ }
+ }
+ return slot;
+ }
+ },
+ executor
+ );
+ }
+
+
+ /**
+ * Try to allocate a shared slot with specified resource profile.
+ *
+ * @param jobID The job id which the slot allocated for
+ * @param resourceProfile The needed resource profile
+ * @param sharingGroupAssignment The slot sharing group of the vertex
+ * @return The future of allocated shared slot
+ */
+ public Future<SharedSlot> allocateSharedSlot(
+ final JobID jobID,
+ final ResourceProfile resourceProfile,
+ final SlotSharingGroupAssignment sharingGroupAssignment)
+ {
+ return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID());
+ }
+
+ /**
+ * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly
+ * for testing purpose since we need to specify whatever allocation id we want.
+ */
+ @VisibleForTesting
+ Future<SharedSlot> allocateSharedSlot(
+ final JobID jobID,
+ final ResourceProfile resourceProfile,
+ final SlotSharingGroupAssignment sharingGroupAssignment,
+ final AllocationID allocationID)
+ {
+ final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
+
+ internalAllocateSlot(jobID, allocationID, resourceProfile, future);
+
+ return future.thenApplyAsync(
+ new ApplyFunction<SlotDescriptor, SharedSlot>() {
+ @Override
+ public SharedSlot apply(SlotDescriptor descriptor) {
+ SharedSlot slot = new SharedSlot(
+ descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(),
+ descriptor.getSlotNumber(), descriptor.getTaskManagerActorGateway(),
+ sharingGroupAssignment);
+
+ synchronized (lock) {
+ // double validation since we are out of the lock protection after the slot is granted
+ if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
+ LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID);
+ allocatedSlots.add(allocationID, descriptor, slot);
+ }
+ else {
+ throw new RuntimeException("Resource was marked dead asynchronously.");
+ }
+ }
+ return slot;
+ }
+ },
+ executor
+ );
+ }
+
+ /**
+ * Internally allocate the slot with specified resource profile. We will first check whether we have some
+ * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to
+ * allocation the slot from resource manager.
+ */
+ private void internalAllocateSlot(
+ final JobID jobID,
+ final AllocationID allocationID,
+ final ResourceProfile resourceProfile,
+ final FlinkCompletableFuture<SlotDescriptor> future)
+ {
+ LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID);
+
+ synchronized (lock) {
+ // check whether we have any free slot which can match the required resource profile
+ SlotDescriptor freeSlot = availableSlots.poll(resourceProfile);
+ if (freeSlot != null) {
+ future.complete(freeSlot);
+ }
+ else {
+ if (resourceManagerGateway != null) {
+ LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.",
+ allocationID);
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
+ pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future));
+ resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout)
+ .handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() {
+ @Override
+ public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(
+ new Exception("Slot allocation from resource manager failed", throwable));
+ } else if (slotRequestReply instanceof RMSlotRequestRejected) {
+ future.completeExceptionally(
+ new Exception("Slot allocation rejected by resource manager"));
+ }
+ return null;
+ }
+ }, executor);
+ }
+ else {
+ LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID);
+ future.completeExceptionally(new Exception("Resource manager not available right now."));
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Slot De-allocation
+ // ------------------------------------------------------------------------
+
+ /**
+ * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
+ * slot can be reused by other pending requests if the resource profile matches.n
+ *
+ * @param slot The slot needs to be returned
+ * @return True if the returning slot been accepted
+ */
+ @Override
+ public boolean returnAllocatedSlot(Slot slot) {
+ checkNotNull(slot);
+ checkArgument(!slot.isAlive(), "slot is still alive");
+ checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool.");
+
+ if (slot.markReleased()) {
+ synchronized (lock) {
+ final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot);
+ if (slotDescriptor != null) {
+ // check if this TaskManager is valid
+ if (!registeredResources.contains(slot.getTaskManagerID())) {
+ return false;
+ }
+
+ final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor);
+ if (pendingRequest != null) {
+ pendingRequest.complete(slotDescriptor);
+ }
+ else {
+ availableSlots.add(slotDescriptor);
+ }
+
+ return true;
+ }
+ else {
+ throw new IllegalArgumentException("Slot was not allocated from this pool.");
+ }
+ }
+ }
+ else {
+ return false;
+ }
+ }
+
+ private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) {
+ for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
+ final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
+ if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) {
+ pendingRequests.remove(entry.getKey());
+ return pendingRequest.f1;
+ }
+ }
+ return null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Slot Releasing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Release slot to TaskManager, called for finished tasks or canceled jobs.
+ *
+ * @param slot The slot needs to be released.
+ */
+ public void releaseSlot(final Slot slot) {
+ synchronized (lock) {
+ allocatedSlots.remove(slot);
+ availableSlots.remove(new SlotDescriptor(slot));
+ // TODO: send release request to task manager
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Slot Offering
+ // ------------------------------------------------------------------------
+
+ /**
+ * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
+ * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
+ * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
+ * request waiting for this slot (maybe fulfilled by some other returned slot).
+ *
+ * @param allocationID The allocation id of the lo
+ * @param slotDescriptor The offered slot descriptor
+ * @return True if we accept the offering
+ */
+ public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) {
+ synchronized (lock) {
+ // check if this TaskManager is valid
+ final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
+ if (!registeredResources.contains(resourceID)) {
+ LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}",
+ allocationID, slotDescriptor);
+ return false;
+ }
+
+ // check whether we have already using this slot
+ final Slot allocatedSlot = allocatedSlots.get(allocationID);
+ if (allocatedSlot != null) {
+ final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot);
+
+ if (allocatedSlotDescriptor.equals(slotDescriptor)) {
+ LOG.debug("Allocation[{}] Duplicated slot offering: {}",
+ allocationID, slotDescriptor);
+ return true;
+ }
+ else {
+ LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}",
+ allocationID, allocatedSlotDescriptor, slotDescriptor);
+ return false;
+ }
+ }
+
+ // check whether we already have this slot in free pool
+ if (availableSlots.contains(slotDescriptor)) {
+ LOG.debug("Allocation[{}] Duplicated slot offering: {}",
+ allocationID, slotDescriptor);
+ return true;
+ }
+
+ // check whether we have request waiting for this slot
+ if (pendingRequests.containsKey(allocationID)) {
+ FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1;
+ future.complete(slotDescriptor);
+ return true;
+ }
+
+ // unwanted slot, rejecting this offer
+ return false;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Resource
+ // ------------------------------------------------------------------------
+
+ /**
+ * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
+ * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
+ *
+ * @param resourceID The id of the TaskManager
+ */
+ public void registerResource(final ResourceID resourceID) {
+ synchronized (lock) {
+ registeredResources.add(resourceID);
+ }
+ }
+
+ /**
+ * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
+ * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
+ *
+ * @param resourceID The id of the TaskManager
+ */
+ public void releaseResource(final ResourceID resourceID) {
+ synchronized (lock) {
+ registeredResources.remove(resourceID);
+ availableSlots.removeByResource(resourceID);
+
+ final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID);
+ for (Slot slot : allocatedSlotsForResource) {
+ slot.releaseSlot();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // ResourceManager
+ // ------------------------------------------------------------------------
+
+ public void setResourceManager(
+ final UUID resourceManagerLeaderId,
+ final ResourceManagerGateway resourceManagerGateway)
+ {
+ synchronized (lock) {
+ this.resourceManagerLeaderId = resourceManagerLeaderId;
+ this.resourceManagerGateway = resourceManagerGateway;
+ }
+ }
+
+ public void disconnectResourceManager() {
+ synchronized (lock) {
+ this.resourceManagerLeaderId = null;
+ this.resourceManagerGateway = null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Helper classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Organize allocated slots from different points of view.
+ */
+ static class AllocatedSlots {
+
+ /** All allocated slots organized by TaskManager */
+ private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
+
+ /** All allocated slots organized by Slot object */
+ private final Map<Slot, AllocationID> allocatedSlots;
+
+ private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
+
+ /** All allocated slots organized by AllocationID */
+ private final Map<AllocationID, Slot> allocatedSlotsById;
+
+ AllocatedSlots() {
+ this.allocatedSlotsByResource = new HashMap<>();
+ this.allocatedSlots = new HashMap<>();
+ this.allocatedSlotsWithDescriptor = new HashMap<>();
+ this.allocatedSlotsById = new HashMap<>();
+ }
+
+ /**
+ * Add a new allocation
+ *
+ * @param allocationID The allocation id
+ * @param slot The allocated slot
+ */
+ void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) {
+ allocatedSlots.put(slot, allocationID);
+ allocatedSlotsById.put(allocationID, slot);
+ allocatedSlotsWithDescriptor.put(slot, descriptor);
+
+ final ResourceID resourceID = slot.getTaskManagerID();
+ Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+ if (slotsForResource == null) {
+ slotsForResource = new HashSet<>();
+ allocatedSlotsByResource.put(resourceID, slotsForResource);
+ }
+ slotsForResource.add(slot);
+ }
+
+ /**
+ * Get allocated slot with allocation id
+ *
+ * @param allocationID The allocation id
+ * @return The allocated slot, null if we can't find a match
+ */
+ Slot get(final AllocationID allocationID) {
+ return allocatedSlotsById.get(allocationID);
+ }
+
+ /**
+ * Check whether we have allocated this slot
+ *
+ * @param slot The slot needs to checked
+ * @return True if we contains this slot
+ */
+ boolean contains(final Slot slot) {
+ return allocatedSlots.containsKey(slot);
+ }
+
+ /**
+ * Remove an allocation with slot.
+ *
+ * @param slot The slot needs to be removed
+ */
+ SlotDescriptor remove(final Slot slot) {
+ final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot);
+ if (descriptor != null) {
+ final AllocationID allocationID = allocatedSlots.remove(slot);
+ if (allocationID != null) {
+ allocatedSlotsById.remove(allocationID);
+ } else {
+ throw new IllegalStateException("Bug: maps are inconsistent");
+ }
+
+ final ResourceID resourceID = slot.getTaskManagerID();
+ final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+ slotsForResource.remove(slot);
+ if (slotsForResource.isEmpty()) {
+ allocatedSlotsByResource.remove(resourceID);
+ }
+
+ return descriptor;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get all allocated slot from same TaskManager.
+ *
+ * @param resourceID The id of the TaskManager
+ * @return Set of slots which are allocated from the same TaskManager
+ */
+ Set<Slot> getSlotsByResource(final ResourceID resourceID) {
+ Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
+ if (slotsForResource != null) {
+ return new HashSet<>(slotsForResource);
+ }
+ else {
+ return new HashSet<>();
+ }
+ }
+
+ @VisibleForTesting
+ boolean containResource(final ResourceID resourceID) {
+ return allocatedSlotsByResource.containsKey(resourceID);
+ }
+
+ @VisibleForTesting
+ int size() {
+ return allocatedSlots.size();
+ }
+ }
+
+ /**
+ * Organize all available slots from different points of view.
+ */
+ static class AvailableSlots {
+
+ /** All available slots organized by TaskManager */
+ private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource;
+
+ /** All available slots */
+ private final Set<SlotDescriptor> availableSlots;
+
+ AvailableSlots() {
+ this.availableSlotsByResource = new HashMap<>();
+ this.availableSlots = new HashSet<>();
+ }
+
+ /**
+ * Add an available slot.
+ *
+ * @param descriptor The descriptor of the slot
+ */
+ void add(final SlotDescriptor descriptor) {
+ availableSlots.add(descriptor);
+
+ final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID();
+ Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID);
+ if (slotsForResource == null) {
+ slotsForResource = new HashSet<>();
+ availableSlotsByResource.put(resourceID, slotsForResource);
+ }
+ slotsForResource.add(descriptor);
+ }
+
+ /**
+ * Check whether we have this slot
+ *
+ * @param slotDescriptor The descriptor of the slot
+ * @return True if we contains this slot
+ */
+ boolean contains(final SlotDescriptor slotDescriptor) {
+ return availableSlots.contains(slotDescriptor);
+ }
+
+ /**
+ * Poll a slot which matches the required resource profile
+ *
+ * @param resourceProfile The required resource profile
+ * @return Slot which matches the resource profile, null if we can't find a match
+ */
+ SlotDescriptor poll(final ResourceProfile resourceProfile) {
+ for (SlotDescriptor slotDescriptor : availableSlots) {
+ if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
+ remove(slotDescriptor);
+ return slotDescriptor;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Remove all available slots come from specified TaskManager.
+ *
+ * @param resourceID The id of the TaskManager
+ */
+ void removeByResource(final ResourceID resourceID) {
+ final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID);
+ if (slotsForResource != null) {
+ for (SlotDescriptor slotDescriptor : slotsForResource) {
+ availableSlots.remove(slotDescriptor);
+ }
+ }
+ }
+
+ private void remove(final SlotDescriptor slotDescriptor) {
+ availableSlots.remove(slotDescriptor);
+
+ final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
+ final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID));
+ slotsForResource.remove(slotDescriptor);
+ if (slotsForResource.isEmpty()) {
+ availableSlotsByResource.remove(resourceID);
+ }
+ }
+
+ @VisibleForTesting
+ boolean containResource(final ResourceID resourceID) {
+ return availableSlotsByResource.containsKey(resourceID);
+ }
+
+ @VisibleForTesting
+ int size() {
+ return availableSlots.size();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5da7827..9463bfe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -652,8 +652,8 @@ object AkkaUtils {
}
def formatDurationParingErrorMessage: String = {
- "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " +
- "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|"+
+ "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " +
+ "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
"(�s|micro|microsecond)|(ns|nano|nanosecond)"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
new file mode 100644
index 0000000..655a3ea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AllocatedSlotsTest {
+
+ @Test
+ public void testOperations() throws Exception {
+ SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots();
+
+ final AllocationID allocation1 = new AllocationID();
+ final ResourceID resource1 = new ResourceID("resource1");
+ final Slot slot1 = createSlot(resource1);
+
+ allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1);
+
+ assertTrue(allocatedSlots.contains(slot1));
+ assertTrue(allocatedSlots.containResource(resource1));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(1, allocatedSlots.size());
+
+ final AllocationID allocation2 = new AllocationID();
+ final Slot slot2 = createSlot(resource1);
+
+ allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2);
+
+ assertTrue(allocatedSlots.contains(slot1));
+ assertTrue(allocatedSlots.contains(slot2));
+ assertTrue(allocatedSlots.containResource(resource1));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(slot2, allocatedSlots.get(allocation2));
+ assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(2, allocatedSlots.size());
+
+ final AllocationID allocation3 = new AllocationID();
+ final ResourceID resource2 = new ResourceID("resource2");
+ final Slot slot3 = createSlot(resource2);
+
+ allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3);
+
+ assertTrue(allocatedSlots.contains(slot1));
+ assertTrue(allocatedSlots.contains(slot2));
+ assertTrue(allocatedSlots.contains(slot3));
+ assertTrue(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertEquals(slot2, allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+ assertEquals(3, allocatedSlots.size());
+
+ allocatedSlots.remove(slot2);
+
+ assertTrue(allocatedSlots.contains(slot1));
+ assertFalse(allocatedSlots.contains(slot2));
+ assertTrue(allocatedSlots.contains(slot3));
+ assertTrue(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertEquals(slot1, allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+ assertEquals(2, allocatedSlots.size());
+
+ allocatedSlots.remove(slot1);
+
+ assertFalse(allocatedSlots.contains(slot1));
+ assertFalse(allocatedSlots.contains(slot2));
+ assertTrue(allocatedSlots.contains(slot3));
+ assertFalse(allocatedSlots.containResource(resource1));
+ assertTrue(allocatedSlots.containResource(resource2));
+
+ assertNull(allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertEquals(slot3, allocatedSlots.get(allocation3));
+ assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size());
+ assertEquals(1, allocatedSlots.size());
+
+ allocatedSlots.remove(slot3);
+
+ assertFalse(allocatedSlots.contains(slot1));
+ assertFalse(allocatedSlots.contains(slot2));
+ assertFalse(allocatedSlots.contains(slot3));
+ assertFalse(allocatedSlots.containResource(resource1));
+ assertFalse(allocatedSlots.containResource(resource2));
+
+ assertNull(allocatedSlots.get(allocation1));
+ assertNull(allocatedSlots.get(allocation2));
+ assertNull(allocatedSlots.get(allocation3));
+ assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size());
+ assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size());
+ assertEquals(0, allocatedSlots.size());
+ }
+
+ private Slot createSlot(final ResourceID resourceId) {
+ Slot slot = mock(Slot.class);
+ when(slot.getTaskManagerID()).thenReturn(resourceId);
+ return slot;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
new file mode 100644
index 0000000..872810f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest {
+
+ static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);
+
+ static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024);
+
+ @Test
+ public void testAddAndRemove() throws Exception {
+ SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+ final ResourceID resource1 = new ResourceID("resource1");
+ final ResourceID resource2 = new ResourceID("resource2");
+
+ final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+ final SlotDescriptor slot2 = createSlotDescriptor(resource1);
+ final SlotDescriptor slot3 = createSlotDescriptor(resource2);
+
+ availableSlots.add(slot1);
+ availableSlots.add(slot2);
+ availableSlots.add(slot3);
+
+ assertEquals(3, availableSlots.size());
+ assertTrue(availableSlots.contains(slot1));
+ assertTrue(availableSlots.contains(slot2));
+ assertTrue(availableSlots.contains(slot3));
+ assertTrue(availableSlots.containResource(resource1));
+ assertTrue(availableSlots.containResource(resource2));
+
+ availableSlots.removeByResource(resource1);
+
+ assertEquals(1, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1));
+ assertFalse(availableSlots.contains(slot2));
+ assertTrue(availableSlots.contains(slot3));
+ assertFalse(availableSlots.containResource(resource1));
+ assertTrue(availableSlots.containResource(resource2));
+
+ availableSlots.removeByResource(resource2);
+
+ assertEquals(0, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1));
+ assertFalse(availableSlots.contains(slot2));
+ assertFalse(availableSlots.contains(slot3));
+ assertFalse(availableSlots.containResource(resource1));
+ assertFalse(availableSlots.containResource(resource2));
+ }
+
+ @Test
+ public void testPollFreeSlot() {
+ SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots();
+
+ final ResourceID resource1 = new ResourceID("resource1");
+ final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+
+ availableSlots.add(slot1);
+
+ assertEquals(1, availableSlots.size());
+ assertTrue(availableSlots.contains(slot1));
+ assertTrue(availableSlots.containResource(resource1));
+
+ assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
+
+ assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE));
+ assertEquals(0, availableSlots.size());
+ assertFalse(availableSlots.contains(slot1));
+ assertFalse(availableSlots.containResource(resource1));
+ }
+
+ static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) {
+ return createSlotDescriptor(resourceID, new JobID());
+ }
+
+ static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) {
+ return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ }
+
+ static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+ final ResourceProfile resourceProfile)
+ {
+ return createSlotDescriptor(resourceID, jobID, resourceProfile, 0);
+ }
+
+ static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID,
+ final ResourceProfile resourceProfile, final int slotNumber)
+ {
+ TaskManagerLocation location = mock(TaskManagerLocation.class);
+ when(location.getResourceID()).thenReturn(resourceID);
+ return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe999e03/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
new file mode 100644
index 0000000..30cdbd6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotPoolTest extends TestLogger {
+
+ private Executor executor;
+
+ private SlotPool slotPool;
+
+ private ResourceManagerGateway resourceManagerGateway;
+
+ @Before
+ public void setUp() throws Exception {
+ this.executor = Executors.newFixedThreadPool(1);
+ this.slotPool = new SlotPool(executor);
+ this.resourceManagerGateway = mock(ResourceManagerGateway.class);
+ when(resourceManagerGateway
+ .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
+ .thenReturn(mock(Future.class));
+ slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway);
+ slotPool.setJobManagerLeaderId(UUID.randomUUID());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testAllocateSimpleSlot() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobID jobID = new JobID();
+ AllocationID allocationID = new AllocationID();
+ Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+ assertFalse(future.isDone());
+ verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+ SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+ assertTrue(future.isDone());
+ assertTrue(slot.isAlive());
+ assertEquals(resourceID, slot.getTaskManagerID());
+ assertEquals(jobID, slot.getJobID());
+ assertEquals(slotPool, slot.getOwner());
+ }
+
+ @Test
+ public void testAllocateSharedSlot() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobVertexID vid = new JobVertexID();
+ SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
+ SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment();
+
+ JobID jobID = new JobID();
+ AllocationID allocationID = new AllocationID();
+ Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID);
+
+ assertFalse(future.isDone());
+ verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+ SharedSlot slot = future.get(1, TimeUnit.SECONDS);
+ assertTrue(future.isDone());
+ assertTrue(slot.isAlive());
+ assertEquals(resourceID, slot.getTaskManagerID());
+ assertEquals(jobID, slot.getJobID());
+ assertEquals(slotPool, slot.getOwner());
+
+ SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
+ assertNotNull(simpleSlot);
+ assertTrue(simpleSlot.isAlive());
+ }
+
+ @Test
+ public void testAllocateSlotWithoutResourceManager() throws Exception {
+ slotPool.disconnectResourceManager();
+ Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE);
+ future.handleAsync(
+ new BiFunction<SimpleSlot, Throwable, Void>() {
+ @Override
+ public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+ assertNull(simpleSlot);
+ assertNotNull(throwable);
+ return null;
+ }
+ },
+ executor);
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail("We expected a ExecutionException.");
+ } catch (ExecutionException ex) {
+ // we expect the exception
+ }
+ }
+
+ @Test
+ public void testAllocationFulfilledByReturnedSlot() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobID jobID = new JobID();
+
+ AllocationID allocationID1 = new AllocationID();
+ Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+
+ AllocationID allocationID2 = new AllocationID();
+ Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+ assertFalse(future1.isDone());
+ assertFalse(future2.isDone());
+ verify(resourceManagerGateway, times(2))
+ .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+ SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ assertTrue(future1.isDone());
+ assertFalse(future2.isDone());
+
+ // return this slot to pool
+ slot1.releaseSlot();
+
+ // second allocation fulfilled by previous slot returning
+ SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+ assertTrue(future2.isDone());
+
+ assertNotEquals(slot1, slot2);
+ assertTrue(slot1.isReleased());
+ assertTrue(slot2.isAlive());
+ assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+ assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+ }
+
+ @Test
+ public void testAllocateWithFreeSlot() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobID jobID = new JobID();
+ AllocationID allocationID1 = new AllocationID();
+ Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+ assertFalse(future1.isDone());
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+ SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ assertTrue(future1.isDone());
+
+ // return this slot to pool
+ slot1.releaseSlot();
+
+ AllocationID allocationID2 = new AllocationID();
+ Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+ // second allocation fulfilled by previous slot returning
+ SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+ assertTrue(future2.isDone());
+
+ assertNotEquals(slot1, slot2);
+ assertTrue(slot1.isReleased());
+ assertTrue(slot2.isAlive());
+ assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
+ assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+ }
+
+ @Test
+ public void testOfferSlot() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobID jobID = new JobID();
+ AllocationID allocationID = new AllocationID();
+ Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID);
+ assertFalse(future.isDone());
+ verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+ // slot from unregistered resource
+ SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
+ assertFalse(slotPool.offerSlot(allocationID, invalid));
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+
+ // reject offering with mismatch allocation id
+ assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor));
+
+ // accepted slot
+ assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+ SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+ assertTrue(future.isDone());
+ assertTrue(slot.isAlive());
+
+ // conflict offer with using slot
+ SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertFalse(slotPool.offerSlot(allocationID, conflict));
+
+ // duplicated offer with using slot
+ assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+ assertTrue(future.isDone());
+ assertTrue(slot.isAlive());
+
+ // duplicated offer with free slot
+ slot.releaseSlot();
+ assertTrue(slot.isReleased());
+ assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+ }
+
+ @Test
+ public void testReleaseResource() throws Exception {
+ ResourceID resourceID = new ResourceID("resource");
+ slotPool.registerResource(resourceID);
+
+ JobID jobID = new JobID();
+
+ AllocationID allocationID1 = new AllocationID();
+ Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1);
+
+ AllocationID allocationID2 = new AllocationID();
+ Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2);
+
+ SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+ assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+ SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ assertTrue(future1.isDone());
+ assertFalse(future2.isDone());
+
+ slotPool.releaseResource(resourceID);
+ assertTrue(slot1.isReleased());
+
+ // slot released and not usable, second allocation still not fulfilled
+ Thread.sleep(10);
+ assertFalse(future2.isDone());
+ }
+
+}