You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:25 UTC
[20/50] [abbrv] flink git commit: [FLINK-4347][cluster management]
Implement SlotManager core
[FLINK-4347][cluster management] Implement SlotManager core
This closes #2388
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c9f9d42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c9f9d42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c9f9d42
Branch: refs/heads/flip-6
Commit: 1c9f9d42319434458524d5dd840df5a0007b06c9
Parents: e6b270d
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Aug 18 15:48:30 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:43 2016 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/SlotManager.java | 525 ++++++++++++++++++
.../clusterframework/types/ResourceID.java | 4 +-
.../clusterframework/types/ResourceProfile.java | 5 +
.../clusterframework/types/ResourceSlot.java | 66 +++
.../runtime/clusterframework/types/SlotID.java | 14 +-
.../rpc/resourcemanager/SlotRequest.java | 51 +-
.../runtime/rpc/taskexecutor/SlotReport.java | 56 ++
.../runtime/rpc/taskexecutor/SlotStatus.java | 129 +++++
.../clusterframework/SlotManagerTest.java | 540 +++++++++++++++++++
9 files changed, 1382 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
new file mode 100644
index 0000000..cc140a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
+
+ /** Gateway to communicate with ResourceManager */
+ private final ResourceManagerGateway resourceManagerGateway;
+
+ /** All registered slots, including free and allocated slots */
+ private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+
+ /** All pending slot requests, waiting available slots to fulfil */
+ private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+ /** All free slots that can be used to be allocated */
+ private final Map<SlotID, ResourceSlot> freeSlots;
+
+ /** All allocations, we can lookup allocations either by SlotID or AllocationID */
+ private final AllocationMap allocationMap;
+
+ public SlotManager(ResourceManagerGateway resourceManagerGateway) {
+ this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+ this.registeredSlots = new HashMap<>(16);
+ this.pendingSlotRequests = new LinkedHashMap<>(16);
+ this.freeSlots = new HashMap<>(16);
+ this.allocationMap = new AllocationMap();
+ }
+
+ // ------------------------------------------------------------------------
+ // slot managements
+ // ------------------------------------------------------------------------
+
+ /**
+ * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
+ * allocation if we don't have enough resource. If we have free slot which can match the request, record
+ * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
+ * RPC's main thread to avoid race condition).
+ *
+ * @param request The detailed request of the slot
+ */
+ public void requestSlot(final SlotRequest request) {
+ if (isRequestDuplicated(request)) {
+ LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId());
+ return;
+ }
+
+ // try to fulfil the request with current free slots
+ ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+ if (slot != null) {
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
+ request.getAllocationId(), request.getJobId());
+
+ // record this allocation in bookkeeping
+ allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId());
+
+ // remove selected slot from free pool
+ freeSlots.remove(slot.getSlotId());
+
+ // TODO: send slot request to TaskManager
+ } else {
+ LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
+ "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId());
+ allocateContainer(request.getResourceProfile());
+ pendingSlotRequests.put(request.getAllocationId(), request);
+ }
+ }
+
+ /**
+ * Sync slot status with TaskManager's SlotReport.
+ */
+ public void updateSlotStatus(final SlotReport slotReport) {
+ for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+ updateSlotStatus(slotStatus);
+ }
+ }
+
+ /**
+ * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
+ * or really rejected by TaskManager. We shall retry this request by:
+ * <ul>
+ * <li>1. verify and clear all the previous allocate information for this request
+ * <li>2. try to request slot again
+ * </ul>
+ * <p>
+ * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
+ * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
+ * but it can be taken care of by rejecting registration at JobManager.
+ *
+ * @param originalRequest The original slot request
+ * @param slotId The target SlotID
+ */
+ public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+ final AllocationID originalAllocationId = originalRequest.getAllocationId();
+ LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
+ slotId, originalAllocationId, originalRequest.getJobId());
+
+ // verify the allocation info before we do anything
+ if (freeSlots.containsKey(slotId)) {
+ // this slot is currently empty, no need to de-allocate it from our allocations
+ LOG.info("Original slot is somehow empty, retrying this request");
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request has somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ } else if (allocationMap.isAllocated(slotId)) {
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ // check whether we have an agreement on whom this slot belongs to
+ if (originalAllocationId.equals(currentAllocationId)) {
+ LOG.info("De-allocate this request and retry");
+ allocationMap.removeAllocation(currentAllocationId);
+
+ // put this slot back to free pool
+ ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+ freeSlots.put(slotId, slot);
+
+ // retry the request
+ requestSlot(originalRequest);
+ } else {
+ // the slot is taken by someone else, no need to de-allocate it from our allocations
+ LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+
+ // before retry, we should double check whether this request was allocated by some other ways
+ if (!allocationMap.isAllocated(originalAllocationId)) {
+ requestSlot(originalRequest);
+ } else {
+ LOG.info("The failed request is somehow been allocated, SlotID:{}",
+ allocationMap.getSlotID(originalAllocationId));
+ }
+ }
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+
+ /**
+ * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+ *
+ * @param resourceId The ResourceID of the TaskManager
+ */
+ public void notifyTaskManagerFailure(final ResourceID resourceId) {
+ LOG.info("Resource:{} been notified failure", resourceId);
+ final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
+ if (slotIdsToRemove != null) {
+ for (SlotID slotId : slotIdsToRemove.keySet()) {
+ LOG.info("Removing Slot:{} upon resource failure", slotId);
+ if (freeSlots.containsKey(slotId)) {
+ freeSlots.remove(slotId);
+ } else if (allocationMap.isAllocated(slotId)) {
+ allocationMap.removeAllocation(slotId);
+ } else {
+ LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // internal behaviors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
+ * <ul>
+ * <li>1. The slot is newly registered.</li>
+ * <li>2. The slot has registered, it contains its current status.</li>
+ * </ul>
+ * <p>
+ * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
+ * <p>
+ * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
+ * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
+ * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
+ * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
+ * and take next action based on the diff between our information and heartbeat status.
+ *
+ * @param reportedStatus Reported slot status
+ */
+ void updateSlotStatus(final SlotStatus reportedStatus) {
+ final SlotID slotId = reportedStatus.getSlotID();
+ final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler());
+
+ if (registerNewSlot(slot)) {
+ // we have a newly registered slot
+ LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+ if (reportedStatus.getAllocationID() != null) {
+ // slot in use, record this in bookkeeping
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+ } else {
+ handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
+ }
+ } else {
+ // slot exists, update current information
+ if (reportedStatus.getAllocationID() != null) {
+ // slot is reported in use
+ final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
+
+ // check whether we also thought this slot is in use
+ if (allocationMap.isAllocated(slotId)) {
+ // we also think that slot is in use, check whether the AllocationID matches
+ final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+
+ if (!reportedAllocationId.equals(currentAllocationId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
+ slotId, currentAllocationId, reportedAllocationId);
+
+ // seems we have a disagreement about the slot assignments, need to correct it
+ allocationMap.removeAllocation(slotId);
+ allocationMap.addAllocation(slotId, reportedAllocationId);
+ }
+ } else {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
+ slotId, reportedAllocationId);
+
+ // we thought the slot is free, should correct this information
+ allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
+
+ // remove this slot from free slots pool
+ freeSlots.remove(slotId);
+ }
+ } else {
+ // slot is reported empty
+
+ // check whether we also thought this slot is empty
+ if (allocationMap.isAllocated(slotId)) {
+ LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
+ slotId, allocationMap.getAllocationID(slotId));
+
+ // we thought the slot is in use, correct it
+ allocationMap.removeAllocation(slotId);
+
+ // we have a free slot!
+ handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler()));
+ }
+ }
+ }
+ }
+
+ /**
+ * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
+ * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
+ * to the free pool.
+ *
+ * @param freeSlot The free slot
+ */
+ private void handleFreeSlot(final ResourceSlot freeSlot) {
+ SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+
+ if (chosenRequest != null) {
+ pendingSlotRequests.remove(chosenRequest.getAllocationId());
+
+ LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
+ chosenRequest.getAllocationId(), chosenRequest.getJobId());
+ allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId());
+
+ // TODO: send slot request to TaskManager
+ } else {
+ freeSlots.put(freeSlot.getSlotId(), freeSlot);
+ }
+ }
+
+ /**
+ * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
+ * formerly received slot request, it is either in pending list or already been allocated.
+ *
+ * @param request The slot request
+ * @return <tt>true</tt> if the request is duplicated
+ */
+ private boolean isRequestDuplicated(final SlotRequest request) {
+ final AllocationID allocationId = request.getAllocationId();
+ return pendingSlotRequests.containsKey(allocationId)
+ || allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Try to register slot, and tell if this slot is newly registered.
+ *
+ * @param slot The ResourceSlot which will be checked and registered
+ * @return <tt>true</tt> if we meet a new slot
+ */
+ private boolean registerNewSlot(final ResourceSlot slot) {
+ final SlotID slotId = slot.getSlotId();
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ return registeredSlots.get(resourceId).put(slotId, slot) == null;
+ }
+
+ private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+ final ResourceID resourceId = slotId.getResourceID();
+ if (!registeredSlots.containsKey(resourceId)) {
+ return null;
+ }
+ return registeredSlots.get(resourceId).get(slotId);
+ }
+
+ // ------------------------------------------------------------------------
+ // Framework specific behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Choose a slot to use among all free slots, the behavior is framework specified.
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+ */
+ protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
+ final Map<SlotID, ResourceSlot> freeSlots);
+
+ /**
+ * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+ */
+ protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
+ final Map<AllocationID, SlotRequest> pendingRequests);
+
+ /**
+ * The framework specific code for allocating a container for specified resource profile.
+ *
+ * @param resourceProfile The resource profile
+ */
+ protected abstract void allocateContainer(final ResourceProfile resourceProfile);
+
+
+ // ------------------------------------------------------------------------
+ // Helper classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
+ * either by SlotID or AllocationID.
+ */
+ private static class AllocationMap {
+
+ /** All allocated slots (by SlotID) */
+ private final Map<SlotID, AllocationID> allocatedSlots;
+
+ /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
+ private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+
+ AllocationMap() {
+ this.allocatedSlots = new HashMap<>(16);
+ this.allocatedSlotsByAllocationId = new HashMap<>(16);
+ }
+
+ /**
+ * Add a allocation
+ *
+ * @param slotId The slot id
+ * @param allocationId The allocation id
+ */
+ void addAllocation(final SlotID slotId, final AllocationID allocationId) {
+ allocatedSlots.put(slotId, allocationId);
+ allocatedSlotsByAllocationId.put(allocationId, slotId);
+ }
+
+ /**
+ * De-allocation with slot id
+ *
+ * @param slotId The slot id
+ */
+ void removeAllocation(final SlotID slotId) {
+ if (allocatedSlots.containsKey(slotId)) {
+ final AllocationID allocationId = allocatedSlots.get(slotId);
+ allocatedSlots.remove(slotId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ }
+ }
+
+ /**
+ * De-allocation with allocation id
+ *
+ * @param allocationId The allocation id
+ */
+ void removeAllocation(final AllocationID allocationId) {
+ if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
+ SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
+ allocatedSlotsByAllocationId.remove(allocationId);
+ allocatedSlots.remove(slotId);
+ }
+ }
+
+ /**
+ * Check whether allocation exists by slot id
+ *
+ * @param slotId The slot id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final SlotID slotId) {
+ return allocatedSlots.containsKey(slotId);
+ }
+
+ /**
+ * Check whether allocation exists by allocation id
+ *
+ * @param allocationId The allocation id
+ * @return true if the allocation exists
+ */
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.containsKey(allocationId);
+ }
+
+ AllocationID getAllocationID(final SlotID slotId) {
+ return allocatedSlots.get(slotId);
+ }
+
+ SlotID getSlotID(final AllocationID allocationId) {
+ return allocatedSlotsByAllocationId.get(allocationId);
+ }
+
+ public int size() {
+ return allocatedSlots.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Testing utilities
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ boolean isAllocated(final SlotID slotId) {
+ return allocationMap.isAllocated(slotId);
+ }
+
+ @VisibleForTesting
+ boolean isAllocated(final AllocationID allocationId) {
+ return allocationMap.isAllocated(allocationId);
+ }
+
+ /**
+ * Add free slots directly to the free pool, this will not trigger pending requests allocation
+ *
+ * @param slot The resource slot
+ */
+ @VisibleForTesting
+ void addFreeSlot(final ResourceSlot slot) {
+ final ResourceID resourceId = slot.getResourceID();
+ final SlotID slotId = slot.getSlotId();
+
+ if (!registeredSlots.containsKey(resourceId)) {
+ registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ }
+ registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+ freeSlots.put(slotId, slot);
+ }
+
+ @VisibleForTesting
+ int getAllocatedSlotCount() {
+ return allocationMap.size();
+ }
+
+ @VisibleForTesting
+ int getFreeSlotCount() {
+ return freeSlots.size();
+ }
+
+ @VisibleForTesting
+ int getPendingRequestCount() {
+ return pendingSlotRequests.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index 8cf9ccb..6b8a037 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -63,9 +63,7 @@ public final class ResourceID implements ResourceIDRetrievable, Serializable {
@Override
public String toString() {
- return "ResourceID{" +
- "resourceId='" + resourceId + '\'' +
- '}';
+ return resourceId;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index cbe709f..ff1c4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -40,6 +40,11 @@ public class ResourceProfile implements Serializable {
this.memoryInMB = memoryInMB;
}
+ public ResourceProfile(ResourceProfile other) {
+ this.cpuCores = other.cpuCores;
+ this.memoryInMB = other.memoryInMB;
+ }
+
/**
* Get the cpu cores needed
* @return The cpu cores, 1.0 means a full cpu thread
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
new file mode 100644
index 0000000..8a6db5f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique
+ * identification and resource profile which we can compare to the resource request.
+ */
+public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+
+ private static final long serialVersionUID = -5853720153136840674L;
+
+ /** The unique identification of this slot */
+ private final SlotID slotId;
+
+ /** The resource profile of this slot */
+ private final ResourceProfile resourceProfile;
+
+ public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+ this.slotId = checkNotNull(slotId);
+ this.resourceProfile = checkNotNull(resourceProfile);
+ }
+
+ @Override
+ public ResourceID getResourceID() {
+ return slotId.getResourceID();
+ }
+
+ public SlotID getSlotId() {
+ return slotId;
+ }
+
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+
+ /**
+ * Check whether required resource profile can be matched by this slot.
+ *
+ * @param required The required resource profile
+ * @return true if requirement can be matched
+ */
+ public boolean isMatchingRequirement(ResourceProfile required) {
+ return resourceProfile.isMatching(required);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d1b072d..e831a5d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -75,9 +75,15 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
@Override
public String toString() {
- return "SlotID{" +
- "resourceId=" + resourceId +
- ", slotId=" + slotId +
- '}';
+ return resourceId + "_" + slotId;
+ }
+
+ /**
+ * Generate a random slot id.
+ *
+ * @return A random slot id.
+ */
+ public static SlotID generate() {
+ return new SlotID(ResourceID.generate(), 0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
index d8fe268..74c7c39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -18,8 +18,57 @@
package org.apache.flink.runtime.rpc.resourcemanager;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
import java.io.Serializable;
-public class SlotRequest implements Serializable{
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
private static final long serialVersionUID = -6586877187990445986L;
+
+ /** The JobID of the slot requested for */
+ private final JobID jobId;
+
+ /** The unique identification of this request */
+ private final AllocationID allocationId;
+
+ /** The resource profile of the required slot */
+ private final ResourceProfile resourceProfile;
+
+ public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
+ this.jobId = checkNotNull(jobId);
+ this.allocationId = checkNotNull(allocationId);
+ this.resourceProfile = checkNotNull(resourceProfile);
+ }
+
+ /**
+ * Get the JobID of the slot requested for.
+ * @return The job id
+ */
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Get the unique identification of this request
+ * @return the allocation id
+ */
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ /**
+ * Get the resource profile of the desired slot
+ * @return The resource profile
+ */
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..c372ecb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, describing
+ * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable {
+
+ private static final long serialVersionUID = -3150175198722481689L;
+
+ /** The slots status of the TaskManager */
+ private final List<SlotStatus> slotsStatus;
+
+ /** The resource id which identifies the TaskManager */
+ private final ResourceID resourceID;
+
+ public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
+ this.slotsStatus = checkNotNull(slotsStatus);
+ this.resourceID = checkNotNull(resourceID);
+ }
+
+ public List<SlotStatus> getSlotsStatus() {
+ return slotsStatus;
+ }
+
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
new file mode 100644
index 0000000..e8e2084
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the slot current status which located in TaskManager.
+ */
+public class SlotStatus implements Serializable {
+
+ private static final long serialVersionUID = 5099191707339664493L;
+
+ /** slotID to identify a slot */
+ private final SlotID slotID;
+
+ /** the resource profile of the slot */
+ private final ResourceProfile profiler;
+
+ /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
+ private final AllocationID allocationID;
+
+ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
+ private final JobID jobID;
+
+ public SlotStatus(SlotID slotID, ResourceProfile profiler) {
+ this(slotID, profiler, null, null);
+ }
+
+ public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
+ this.slotID = checkNotNull(slotID, "slotID cannot be null");
+ this.profiler = checkNotNull(profiler, "profile cannot be null");
+ this.allocationID = allocationID;
+ this.jobID = jobID;
+ }
+
+ /**
+ * Get the unique identification of this slot
+ *
+ * @return The slot id
+ */
+ public SlotID getSlotID() {
+ return slotID;
+ }
+
+ /**
+ * Get the resource profile of this slot
+ *
+ * @return The resource profile
+ */
+ public ResourceProfile getProfiler() {
+ return profiler;
+ }
+
+ /**
+ * Get the allocation id of this slot
+ *
+ * @return The allocation id if this slot is allocated, otherwise null
+ */
+ public AllocationID getAllocationID() {
+ return allocationID;
+ }
+
+ /**
+ * Get the job id of the slot allocated for
+ *
+ * @return The job id if this slot is allocated, otherwise null
+ */
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlotStatus that = (SlotStatus) o;
+
+ if (!slotID.equals(that.slotID)) {
+ return false;
+ }
+ if (!profiler.equals(that.profiler)) {
+ return false;
+ }
+ if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
+ return false;
+ }
+ return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = slotID.hashCode();
+ result = 31 * result + profiler.hashCode();
+ result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
+ result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1c9f9d42/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
new file mode 100644
index 0000000..2ee280f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+ private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+ private static final long DEFAULT_TESTING_MEMORY = 512;
+
+ private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+
+ private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+ new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+
+ private ResourceManagerGateway resourceManagerGateway;
+
+ @Before
+ public void setUp() {
+ resourceManagerGateway = mock(ResourceManagerGateway.class);
+ }
+
+ /**
+ * Tests that there are no free slots when we request, need to allocate from cluster manager master
+ */
+ @Test
+ public void testRequestSlotWithoutFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that there are some free slots when we request, and the request is fulfilled immediately
+ */
+ @Test
+ public void testRequestSlotWithFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+ assertEquals(1, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertEquals(0, slotManager.getAllocatedContainers().size());
+ }
+
+ /**
+ * Tests that there are some free slots when we request, but none of them are suitable
+ */
+ @Test
+ public void testRequestSlotWithoutSuitableSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
+ assertEquals(2, slotManager.getFreeSlotCount());
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send duplicated slot request
+ */
+ @Test
+ public void testDuplicatedSlotRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
+
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+ slotManager.requestSlot(request1);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request2);
+ slotManager.requestSlot(request1);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertEquals(1, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ }
+
+ /**
+ * Tests that we send multiple slot requests
+ */
+ @Test
+ public void testRequestMultipleSlots() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+
+ // request 3 normal slots
+ for (int i = 0; i < 3; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ }
+
+ // request 2 big slots
+ for (int i = 0; i < 2; ++i) {
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ }
+
+ // request 1 normal slot again
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(4, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(2, slotManager.getPendingRequestCount());
+ assertEquals(2, slotManager.getAllocatedContainers().size());
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+ assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but we have no pending request
+ */
+ @Test
+ public void testNewlyAppearedFreeSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+ */
+ @Test
+ public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+ assertEquals(1, slotManager.getPendingRequestCount());
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+ */
+ @Test
+ public void testNewlyAppearedInUseSlot() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+
+ // slot status is confirmed
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE,
+ request.getAllocationId(), request.getJobId());
+ slotManager.updateSlotStatus(slotStatus2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ }
+
+ /**
+ * Tests that we had a slot in-use, but it's empty according to the SlotReport
+ */
+ @Test
+ public void testExistingInUseSlotAdjustedToEmpty() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request1);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request pending
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+ // but slot is reported empty again, request2 will be fulfilled, request1 will be missing
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ /**
+ * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation
+ * information didn't match.
+ */
+ @Test
+ public void testExistingInUseSlotWithDifferentAllocationInfo() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // make this slot in use
+ SlotID slotId = SlotID.generate();
+ SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+ SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+ // update slot status with different allocation info
+ slotManager.updateSlotStatus(slotStatus2);
+
+ // original request is missing and won't be allocated
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slotId));
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+ }
+
+ /**
+ * Tests that we had a free slot, and it's confirmed by SlotReport
+ */
+ @Test
+ public void testExistingEmptySlotUpdateStatus() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE);
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(0, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ /**
+ * Tests that we had a free slot, and it's reported in-use by TaskManager
+ */
+ @Test
+ public void testExistingEmptySlotAdjustedToInUse() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE,
+ new AllocationID(), new JobID());
+ slotManager.updateSlotStatus(slotStatus);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+ }
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManager() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+
+ /**
+ * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
+ */
+ @Test
+ public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+ ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE);
+ slotManager.addFreeSlot(slot);
+
+ SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request);
+
+ // slot is set empty by heartbeat
+ SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile());
+ slotManager.updateSlotStatus(slotStatus);
+
+ // another request took this slot
+ SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
+ slotManager.requestSlot(request2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+ // original request should be pended
+ slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(1, slotManager.getPendingRequestCount());
+ assertFalse(slotManager.isAllocated(request.getAllocationId()));
+ assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+ }
+
+ @Test
+ public void testNotifyTaskManagerFailure() {
+ TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway);
+
+ ResourceID resource1 = ResourceID.generate();
+ ResourceID resource2 = ResourceID.generate();
+
+ ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE);
+ ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE);
+
+ slotManager.addFreeSlot(slot11);
+ slotManager.addFreeSlot(slot21);
+
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+ slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(0, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.addFreeSlot(slot12);
+ slotManager.addFreeSlot(slot22);
+
+ assertEquals(2, slotManager.getAllocatedSlotCount());
+ assertEquals(2, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ slotManager.notifyTaskManagerFailure(resource2);
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+
+ // notify an not exist resource failure
+ slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+ assertEquals(1, slotManager.getAllocatedSlotCount());
+ assertEquals(1, slotManager.getFreeSlotCount());
+ assertEquals(0, slotManager.getPendingRequestCount());
+ }
+
+ // ------------------------------------------------------------------------
+ // testing utilities
+ // ------------------------------------------------------------------------
+
+ private void directlyProvideFreeSlots(
+ final SlotManager slotManager,
+ final ResourceProfile resourceProfile,
+ final int freeSlotNum)
+ {
+ for (int i = 0; i < freeSlotNum; ++i) {
+ slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // testing classes
+ // ------------------------------------------------------------------------
+
+ private static class TestingSlotManager extends SlotManager {
+
+ private final List<ResourceProfile> allocatedContainers;
+
+ TestingSlotManager(ResourceManagerGateway resourceManagerGateway) {
+ super(resourceManagerGateway);
+ this.allocatedContainers = new LinkedList<>();
+ }
+
+ /**
+ * Choose slot randomly if it matches requirement
+ *
+ * @param request The slot request
+ * @param freeSlots All slots which can be used
+ * @return The chosen slot or null if cannot find a match
+ */
+ @Override
+ protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
+ for (ResourceSlot slot : freeSlots.values()) {
+ if (slot.isMatchingRequirement(request.getResourceProfile())) {
+ return slot;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Choose request randomly if offered slot can match its requirement
+ *
+ * @param offeredSlot The free slot
+ * @param pendingRequests All the pending slot requests
+ * @return The chosen request's AllocationID or null if cannot find a match
+ */
+ @Override
+ protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
+ Map<AllocationID, SlotRequest> pendingRequests)
+ {
+ for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
+ if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
+ return pendingRequest.getValue();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void allocateContainer(ResourceProfile resourceProfile) {
+ allocatedContainers.add(resourceProfile);
+ }
+
+ List<ResourceProfile> getAllocatedContainers() {
+ return allocatedContainers;
+ }
+ }
+}