You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:13 UTC
[06/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Introduce a
hardened slot manager
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f055971..31edbf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -21,519 +21,893 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all registered task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there
+ * are not enough slots available the slot manager will notify the resource manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task managers (task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests time out triggering their
+ * release and failure, respectively.
*/
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
+
+ /** Scheduled executor for timeouts */
+ private final ScheduledExecutor scheduledExecutor;
+
+ /** Timeout for slot requests to the task manager */
+ private final Time taskManagerRequestTimeout;
+
+ /** Timeout after which an allocation is discarded */
+ private final Time slotRequestTimeout;
+
+ /** Timeout after which an unused TaskManager is released */
+ private final Time taskManagerTimeout;
+
+ /** Map for all registered slots */
+ private final HashMap<SlotID, TaskManagerSlot> slots;
- protected final Logger LOG = LoggerFactory.getLogger(getClass());
+ /** Index of all currently free slots */
+ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
- /** The Resource allocation provider */
- protected final ResourceManagerServices rmServices;
+ /** All currently registered task managers */
+ private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations;
- /** All registered task managers with ResourceID and gateway. */
- private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+ /** Map of fulfilled and active allocations for request deduplication purposes */
+ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
- /** All registered slots, including free and allocated slots */
- private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
+ /** Map of pending/unfulfilled slot allocation requests */
+ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
- /** All pending slot requests, waiting available slots to fulfil */
- private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+ /** Leader id of the containing component */
+ private UUID leaderId;
- /** All free slots that can be used to be allocated */
- private final Map<SlotID, ResourceSlot> freeSlots;
+ /** Executor for future callbacks which have to be "synchronized" */
+ private Executor mainThreadExecutor;
- /** All allocations, we can lookup allocations either by SlotID or AllocationID */
- private final AllocationMap allocationMap;
+ /** Callbacks for resource (de-)allocations */
+ private ResourceManagerActions resourceManagerActions;
- private final Time timeout;
+ /** True iff the component has been started */
+ private boolean started;
- public SlotManager(ResourceManagerServices rmServices) {
- this.rmServices = checkNotNull(rmServices);
- this.registeredSlots = new HashMap<>(16);
- this.pendingSlotRequests = new LinkedHashMap<>(16);
- this.freeSlots = new HashMap<>(16);
- this.allocationMap = new AllocationMap();
- this.taskManagers = new HashMap<>();
- this.timeout = Time.seconds(10);
+ public SlotManager(
+ ScheduledExecutor scheduledExecutor,
+ Time taskManagerRequestTimeout,
+ Time slotRequestTimeout,
+ Time taskManagerTimeout) {
+ this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
+ this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
+ this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
+ this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
+
+ slots = new HashMap<>(16);
+ freeSlots = new LinkedHashMap<>(16);
+ taskManagerRegistrations = new HashMap<>(4);
+ fulfilledSlotRequests = new HashMap<>(16);
+ pendingSlotRequests = new HashMap<>(16);
+
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
}
- // ------------------------------------------------------------------------
- // slot managements
- // ------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------------
+ // Component lifecycle methods
+ // ---------------------------------------------------------------------------------------------
/**
- * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container
- * allocation if we don't have enough resource. If we have free slot which can match the request, record
- * this allocation and forward the request to TaskManager through ResourceManager (we want this done by
- * RPC's main thread to avoid race condition).
+ * Starts the slot manager with the given leader id and resource manager actions.
*
- * @param request The detailed request of the slot
- * @return RMSlotRequestRegistered The confirmation message to be send to the caller
+ * @param newLeaderId to use for communication with the task managers
+ * @param newResourceManagerActions to use for resource (de-)allocations
+ */
+ public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
+ leaderId = Preconditions.checkNotNull(newLeaderId);
+ mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
+ resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions);
+
+ started = true;
+ }
+
+ /**
+ * Suspends the component. This clears the internal state of the slot manager.
*/
- public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- if (isRequestDuplicated(request)) {
- LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
- return new RMSlotRequestRegistered(allocationId);
+ public void suspend() {
+ for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
+ cancelPendingSlotRequest(pendingSlotRequest);
}
- // try to fulfil the request with current free slots
- final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
- if (slot != null) {
- LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(),
- allocationId, request.getJobId());
+ pendingSlotRequests.clear();
+
+ ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet());
+
+ for (InstanceID registeredTaskManager : registeredTaskManagers) {
+ unregisterTaskManager(registeredTaskManager);
+ }
- // record this allocation in bookkeeping
- allocationMap.addAllocation(slot.getSlotId(), allocationId);
- // remove selected slot from free pool
- freeSlots.remove(slot.getSlotId());
+ leaderId = null;
+ resourceManagerActions = null;
+ started = false;
+ }
+
+ /**
+ * Closes the slot manager.
+ *
+ * @throws Exception if the close operation fails
+ */
+ @Override
+ public void close() throws Exception {
+ suspend();
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Public API
+ // ---------------------------------------------------------------------------------------------
+
+ /**
+ * Requests a slot with the respective resource profile.
+ *
+ * @param slotRequest specifying the requested slot specs
+ * @return true if the slot request was registered; false if the request is a duplicate
+ * @throws SlotManagerException if the slot request failed (e.g. not enough resources left)
+ */
+ public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
+ checkInit();
- sendSlotRequest(slot, request);
+ if (checkDuplicateRequest(slotRequest.getAllocationId())) {
+ LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
+
+ return false;
} else {
- LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
- "AllocationID:{}, JobID:{}", allocationId, request.getJobId());
- Preconditions.checkState(rmServices != null,
- "Attempted to allocate resources but no ResourceManagerServices set.");
- rmServices.allocateResource(request.getResourceProfile());
- pendingSlotRequests.put(allocationId, request);
+ PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
+
+ pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
+
+ try {
+ internalRequestSlot(pendingSlotRequest);
+ } catch (ResourceManagerException e) {
+ // requesting the slot failed --> remove pending slot request
+ pendingSlotRequests.remove(slotRequest.getAllocationId());
+
+ throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+ }
+
+ return true;
}
+ }
+
+ /**
+ * Cancels and removes a pending slot request with the given allocation id. If there is no such
+ * pending request, then nothing is done.
+ *
+ * @param allocationId identifying the pending slot request
+ * @return True if a pending slot request was found; otherwise false
+ */
+ public boolean unregisterSlotRequest(AllocationID allocationId) {
+ checkInit();
+
+ PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+ if (null != pendingSlotRequest) {
+ cancelPendingSlotRequest(pendingSlotRequest);
+
+ return true;
+ } else {
+ LOG.debug("No pending slot request with allocation id {} found.", allocationId);
- return new RMSlotRequestRegistered(allocationId);
+ return false;
+ }
}
/**
- * Notifies the SlotManager that a slot is available again after being allocated.
- * @param slotID slot id of available slot
+ * Registers a new task manager at the slot manager. This will make the task managers slots
+ * known and, thus, available for allocation.
+ *
+ * @param taskExecutorConnection for the new task manager
+ * @param initialSlotReport for the new task manager
*/
- public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
- if (!allocationMap.isAllocated(slotID)) {
- throw new IllegalStateException("Slot was not previously allocated but " +
- "TaskManager reports it as available again");
+ public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
+ checkInit();
+
+ // we identify task managers by their instance id
+ if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
+ TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection);
+ taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
}
- allocationMap.removeAllocation(slotID);
- final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
- ResourceSlot freeSlot = slots.get(slotID);
- if (freeSlot == null) {
- throw new IllegalStateException("Slot was not registered with SlotManager but " +
- "TaskManager reported it to be available.");
+
+ reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
+ }
+
+ /**
+ * Unregisters the task manager identified by the given instance id and its associated slots
+ * from the slot manager.
+ *
+ * @param instanceId identifying the task manager to unregister
+ * @return True if there existed a registered task manager with the given instance id
+ */
+ public boolean unregisterTaskManager(InstanceID instanceId) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
+
+ if (null != taskManagerRegistration) {
+ removeSlots(taskManagerRegistration.getSlots());
+
+ taskManagerRegistration.cancelTimeout();
+
+ return true;
+ } else {
+ LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId);
+
+ return false;
}
- handleFreeSlot(freeSlot);
}
/**
- * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.)
- * or really rejected by TaskManager. We shall retry this request by:
- * <ul>
- * <li>1. verify and clear all the previous allocate information for this request
- * <li>2. try to request slot again
- * </ul>
- * <p>
- * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response
- * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request,
- * but it can be taken care of by rejecting registration at JobManager.
+ * Reports the current slot allocations for a task manager identified by the given instance id.
*
- * @param originalRequest The original slot request
- * @param slotId The target SlotID
+ * @param instanceId identifying the task manager for which to report the slot status
+ * @param slotReport containing the status for all of its slots
+ * @return true if the slot status has been updated successfully, otherwise false
*/
- void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
- final AllocationID originalAllocationId = originalRequest.getAllocationId();
- LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
- slotId, originalAllocationId, originalRequest.getJobId());
-
- if (allocationMap.isAllocated(slotId)) {
- final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId);
-
- // check whether we have an agreement on whom this slot belongs to
- if (originalAllocationId.equals(expectedAllocationId)) {
- LOG.info("De-allocate this request and retry");
- allocationMap.removeAllocation(expectedAllocationId);
- pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
- ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
- // treat this slot as empty and retry with a different request
- handleFreeSlot(slot);
- } else {
- LOG.error("Slot request failed for slot {} with allocation id {}:" +
- " Allocation id did not match the expected allocation id {}.",
- slotId, originalAllocationId, expectedAllocationId);
+ public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
+ checkInit();
+
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
+
+ if (null != taskManagerRegistration) {
+ ArrayList<SlotID> slotsToRemove = new ArrayList<>(taskManagerRegistration.getSlots());
+ boolean idle = true;
+
+ for (SlotStatus slotStatus : slotReport) {
+ if (slotsToRemove.remove(slotStatus.getSlotID())) {
+ // slot which was already registered
+ updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID());
+ } else {
+ // new slot
+ registerSlot(
+ taskManagerRegistration,
+ slotStatus.getSlotID(),
+ slotStatus.getAllocationID(),
+ slotStatus.getResourceProfile(),
+ taskManagerRegistration.getTaskManagerConnection());
+ }
+
+ TaskManagerSlot slot = slots.get(slotStatus.getSlotID());
+
+ idle &= slot.isFree();
+ }
+
+ // remove the slots for which we haven't received a slot status message
+ removeSlots(slotsToRemove);
+
+ if (idle) {
+ // no slot of this task manager is being used --> register timer to free this resource
+ registerTaskManagerTimeout(taskManagerRegistration);
}
+
+ return true;
} else {
- LOG.error("Slot request failed for slot {} with allocation id {}: " +
- "Slot was not previously registered.",
- slotId, originalAllocationId);
+ LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceId);
+
+ return false;
}
}
/**
- * Registers a TaskExecutor
- * @param resourceID TaskExecutor's ResourceID
- * @param registration TaskExecutor's registration
- * @param slotReport TaskExecutor's free and allocated slots
+ * Free the given slot from the given allocation. If the slot is still allocated by the given
+ * allocation id, then the slot will be marked as free and will be subject to new slot requests.
+ *
+ * @param slotId identifying the slot to free
+ * @param allocationId with which the slot is presumably allocated
*/
- public void registerTaskExecutor(
- ResourceID resourceID,
- TaskExecutorRegistration registration,
- SlotReport slotReport) {
+ public void freeSlot(SlotID slotId, AllocationID allocationId) {
+ checkInit();
- if (taskManagers.get(resourceID) != null) {
- notifyTaskManagerFailure(resourceID);
- }
+ TaskManagerSlot slot = slots.get(slotId);
+
+ if (null != slot) {
+ if (slot.isAllocated()) {
+ if (Objects.equals(allocationId, slot.getAllocationId())) {
+ // free the slot
+ slot.setAllocationId(null);
+ fulfilledSlotRequests.remove(allocationId);
- this.taskManagers.put(resourceID, registration);
+ if (slot.isFree()) {
+ handleFreeSlot(slot);
+ }
- for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
- final SlotID slotId = slotStatus.getSlotID();
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
- final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID());
- if (taskExecutorRegistration == null) {
- LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
- slotId.getResourceID());
- return;
+ if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
+ registerTaskManagerTimeout(taskManagerRegistration);
+ }
+ } else {
+ LOG.debug("Received request to free slot {} with expected allocation id {}, " +
+ "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
+ }
+ } else {
+ LOG.debug("Slot {} has not been allocated.", allocationId);
}
+ } else {
+ LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotId);
+ }
+ }
- final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration);
+ // ---------------------------------------------------------------------------------------------
+ // Behaviour methods
+ // ---------------------------------------------------------------------------------------------
- registerNewSlot(slot);
- LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID());
+ /**
+ * Finds a matching slot request for a given resource profile. If there is no such request,
+ * the method returns null.
+ *
+ * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+ * request fulfillment, then you should override this method.
+ *
+ * @param slotResourceProfile defining the resources of an available slot
+ * @return A matching slot request which can be deployed in a slot with the given resource
+ * profile. Null if there is no such slot request pending.
+ */
+ protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) {
- if (slotStatus.getAllocationID() != null) {
- // slot in use, record this in bookkeeping
- allocationMap.addAllocation(slotId, slotStatus.getAllocationID());
- } else {
- handleFreeSlot(slot);
+ for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) {
+ if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) {
+ return pendingSlotRequest;
}
}
+
+ return null;
}
/**
- * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots.
+ * Finds a matching slot for a given resource profile. A matching slot has at least as many
+ * resources available as the given resource profile. If there is no such slot available, then
+ * the method returns null.
*
- * @param resourceId The ResourceID of the TaskManager
+ * Note: If you want to change the behaviour of the slot manager wrt slot allocation and
+ * request fulfillment, then you should override this method.
+ *
+ * @param requestResourceProfile specifying the resource requirements for the a slot request
+ * @return A matching slot which fulfills the given resource profile. Null if there is no such
+ * slot available.
*/
- public void notifyTaskManagerFailure(final ResourceID resourceId) {
- LOG.info("Resource:{} been notified failure", resourceId);
- taskManagers.remove(resourceId);
- final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
- if (slotIdsToRemove != null) {
- for (SlotID slotId : slotIdsToRemove.keySet()) {
- LOG.info("Removing Slot: {} upon resource failure", slotId);
- if (freeSlots.containsKey(slotId)) {
- freeSlots.remove(slotId);
- } else if (allocationMap.isAllocated(slotId)) {
- allocationMap.removeAllocation(slotId);
- } else {
- LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
- }
+ protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) {
+ Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ TaskManagerSlot taskManagerSlot = iterator.next().getValue();
+
+ // sanity check
+ Preconditions.checkState(taskManagerSlot.isFree());
+
+ if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
+ iterator.remove();
+ return taskManagerSlot;
}
}
+
+ return null;
}
- // ------------------------------------------------------------------------
- // internal behaviors
- // ------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------------
+ // Internal slot operations
+ // ---------------------------------------------------------------------------------------------
/**
- * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
- * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
- * to the free pool.
+ * Registers a slot for the given task manager at the slot manager. The slot is identified by
+ * the given slot id. The given resource profile defines the available resources for the slot.
+ * The task manager connection can be used to communicate with the task manager.
*
- * @param freeSlot The free slot
+ * @param taskManagerRegistration Task manager for which to register the given slot
+ * @param slotId identifying the slot on the task manager
+ * @param allocationId which is currently deployed in the slot
+ * @param resourceProfile of the slot
+ * @param taskManagerConnection to communicate with the remote task manager
*/
- private void handleFreeSlot(final ResourceSlot freeSlot) {
- SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests);
+ private void registerSlot(
+ TaskManagerRegistration taskManagerRegistration,
+ SlotID slotId,
+ AllocationID allocationId,
+ ResourceProfile resourceProfile,
+ TaskExecutorConnection taskManagerConnection) {
+
+ Preconditions.checkNotNull(taskManagerRegistration);
+
+ TaskManagerSlot slot = new TaskManagerSlot(
+ slotId,
+ resourceProfile,
+ taskManagerConnection,
+ allocationId);
+
+ slots.put(slotId, slot);
+
+ taskManagerRegistration.addSlot(slotId);
+
+ if (slot.isFree()) {
+ handleFreeSlot(slot);
+ }
- if (chosenRequest != null) {
- final AllocationID allocationId = chosenRequest.getAllocationId();
- final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId);
+ if (slot.isAllocated()) {
+ fulfilledSlotRequests.put(slot.getAllocationId(), slotId);
+ }
+ }
+
+ /**
+ * Updates a slot with the given allocation id.
+ *
+ * @param slotId to update
+ * @param allocationId specifying the current allocation of the slot
+ */
+ private void updateSlot(SlotID slotId, AllocationID allocationId) {
+ TaskManagerSlot slot = slots.get(slotId);
+
+ if (null != slot) {
+ // we assume the given allocation id to be the ground truth (coming from the TM)
+ slot.setAllocationId(allocationId);
+
+ if (null != allocationId) {
+ if (slot.hasPendingSlotRequest()){
+ // we have a pending slot request --> check whether we have to reject it
+ PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest();
+
+ if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
+ // we can cancel the slot request because it has been fulfilled
+ cancelPendingSlotRequest(pendingSlotRequest);
+
+ // remove the pending slot request, since it has been completed
+ pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
+ } else {
+ // this will try to find a new slot for the request
+ rejectPendingSlotRequest(
+ pendingSlotRequest,
+ new Exception("Task manager reported slot " + slotId + " being already allocated."));
+ }
+
+ slot.setAssignedSlotRequest(null);
+ }
- LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
- allocationId, chosenRequest.getJobId());
- allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
+ fulfilledSlotRequests.put(allocationId, slotId);
- sendSlotRequest(freeSlot, slotRequest);
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (null != taskManagerRegistration) {
+ // disable any registered time out for the task manager
+ taskManagerRegistration.cancelTimeout();
+ }
+ }
} else {
- freeSlots.put(freeSlot.getSlotId(), freeSlot);
+ LOG.debug("Trying to update unknown slot with slot id {}.", slotId);
}
}
- private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slotRequest) {
+ /**
+ * Tries to allocate a slot for the given slot request. If there is no slot available, the
+ * resource manager is informed to allocate more resources and a timeout for the request is
+ * registered.
+ *
+ * @param pendingSlotRequest to allocate a slot for
+ * @throws ResourceManagerException if the resource manager cannot allocate more resource
+ */
+ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
+ TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile());
- final AllocationID allocationID = slotRequest.getAllocationId();
- final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
- final Future<TMSlotRequestReply> slotRequestReplyFuture =
- registration.getTaskExecutorGateway()
- .requestSlot(
- freeSlot.getSlotId(),
- slotRequest.getJobId(),
- allocationID,
- "foobar", // TODO: set proper JM address
- rmServices.getLeaderID(),
- timeout);
+ if (taskManagerSlot != null) {
+ allocateSlot(taskManagerSlot, pendingSlotRequest);
+ } else {
+ final UUID timeoutIdentifier = UUID.randomUUID();
+ final AllocationID allocationId = pendingSlotRequest.getAllocationId();
+
+ // register timeout for slot request
+ ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ timeoutSlotRequest(allocationId, timeoutIdentifier);
+ }
+ });
+ }
+ }, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier);
- slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() {
+ resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
+ }
+ }
+
+ /**
+ * Allocates the given slot for the given slot request. This entails sending a registration
+ * message to the task manager and treating failures.
+ *
+ * @param taskManagerSlot to allocate for the given slot request
+ * @param pendingSlotRequest to allocate the given slot for
+ */
+ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
+ TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
+ TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
+
+ final CompletableFuture<Acknowledge> completableFuture = new FlinkCompletableFuture<>();
+ final AllocationID allocationId = pendingSlotRequest.getAllocationId();
+ final SlotID slotId = taskManagerSlot.getSlotId();
+
+ taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
+ pendingSlotRequest.setRequestFuture(completableFuture);
+
+ // RPC call to the task manager
+ Future<Acknowledge> requestFuture = gateway.requestSlot(
+ slotId,
+ pendingSlotRequest.getJobId(),
+ allocationId,
+ pendingSlotRequest.getTargetAddress(),
+ leaderId,
+ taskManagerRequestTimeout);
+
+ requestFuture.handle(new BiFunction<Acknowledge, Throwable, Void>() {
+ @Override
+ public Void apply(Acknowledge acknowledge, Throwable throwable) {
+ if (acknowledge != null) {
+ completableFuture.complete(acknowledge);
+ } else {
+ completableFuture.completeExceptionally(throwable);
+ }
+
+ return null;
+ }
+ });
+
+ completableFuture.handleAsync(new BiFunction<Acknowledge, Throwable, Void>() {
@Override
- public Void apply(TMSlotRequestReply slotRequestReply, Throwable throwable) {
- TaskExecutorRegistration current = taskManagers.get(slotRequestReply.getResourceID());
- if (current != null && current.getInstanceID().equals(slotRequestReply.getInstanceID())) {
- if (throwable != null || slotRequestReply instanceof TMSlotRequestRejected) {
- handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId());
+ public Void apply(Acknowledge acknowledge, Throwable throwable) {
+ if (acknowledge != null) {
+ updateSlot(slotId, allocationId);
+ } else {
+ if (throwable instanceof SlotOccupiedException) {
+ SlotOccupiedException exception = (SlotOccupiedException) throwable;
+ updateSlot(slotId, exception.getAllocationId());
} else {
- LOG.debug("Successfully registered slot {} ", freeSlot.getSlotId());
+ removeSlotRequestFromSlot(slotId, allocationId);
+ }
+
+ if (!(throwable instanceof CancellationException)) {
+ handleFailedSlotRequest(slotId, allocationId, throwable);
+ } else {
+ LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
- } else {
- LOG.debug("Discarding message from obsolete TaskExecutor with InstanceID {}",
- slotRequestReply.getInstanceID());
}
+
return null;
}
- }, rmServices.getMainThreadExecutor());
+ }, mainThreadExecutor);
}
/**
- * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
- * formerly received slot request, it is either in pending list or already been allocated.
+ * Handles a free slot. It first tries to find a pending slot request which can be fulfilled.
+ * If there is no such request, then it will add the slot to the set of free slots.
*
- * @param request The slot request
- * @return <tt>true</tt> if the request is duplicated
+ * @param freeSlot to find a new slot request for
*/
- private boolean isRequestDuplicated(final SlotRequest request) {
- final AllocationID allocationId = request.getAllocationId();
- return pendingSlotRequests.containsKey(allocationId)
- || allocationMap.isAllocated(allocationId);
+ private void handleFreeSlot(TaskManagerSlot freeSlot) {
+ PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile());
+
+ if (null != pendingSlotRequest) {
+ allocateSlot(freeSlot, pendingSlotRequest);
+ } else {
+ freeSlots.put(freeSlot.getSlotId(), freeSlot);
+ }
}
/**
- * Registers a new slot with the SlotManager.
+ * Removes the given set of slots from the slot manager.
*
- * @param slot The ResourceSlot which will be registered
+ * @param slotsToRemove identifying the slots to remove from the slot manager
*/
- private void registerNewSlot(final ResourceSlot slot) {
- final SlotID slotId = slot.getSlotId();
- final ResourceID resourceId = slotId.getResourceID();
- if (!registeredSlots.containsKey(resourceId)) {
- registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
+ private void removeSlots(Iterable<SlotID> slotsToRemove) {
+ for (SlotID slotId : slotsToRemove) {
+ removeSlot(slotId);
}
- registeredSlots.get(resourceId).put(slotId, slot);
}
- private ResourceSlot getRegisteredSlot(final SlotID slotId) {
- final ResourceID resourceId = slotId.getResourceID();
- if (!registeredSlots.containsKey(resourceId)) {
- return null;
+ /**
+ * Removes the given slot from the slot manager.
+ *
+ * @param slotId identifying the slot to remove
+ */
+ private void removeSlot(SlotID slotId) {
+ TaskManagerSlot slot = slots.remove(slotId);
+
+ if (null != slot) {
+ freeSlots.remove(slotId);
+
+ if (slot.hasPendingSlotRequest()) {
+ // reject the pending slot request --> triggering a new allocation attempt
+ rejectPendingSlotRequest(
+ slot.getAssignedSlotRequest(),
+ new Exception("The assigned slot " + slot.getSlotId() + " was removed."));
+ }
+
+ AllocationID oldAllocationId = slot.getAllocationId();
+
+ fulfilledSlotRequests.remove(oldAllocationId);
+
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
+
+ if (null != taskManagerRegistration) {
+ taskManagerRegistration.removeSlot(slotId);
+ }
+ } else {
+ LOG.debug("There was no slot registered with slot id {}.", slotId);
}
- return registeredSlots.get(resourceId).get(slotId);
}
- // ------------------------------------------------------------------------
- // Framework specific behavior
- // ------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------------
+ // Internal request handling methods
+ // ---------------------------------------------------------------------------------------------
/**
- * Choose a slot to use among all free slots, the behavior is framework specified.
+ * Removes a pending slot request identified by the given allocation id from a slot identified
+ * by the given slot id.
*
- * @param request The slot request
- * @param freeSlots All slots which can be used
- * @return The slot we choose to use, <tt>null</tt> if we did not find a match
+ * @param slotId identifying the slot
+ * @param allocationId identifying the presumable assigned pending slot request
*/
- protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request,
- final Map<SlotID, ResourceSlot> freeSlots);
+ private void removeSlotRequestFromSlot(SlotID slotId, AllocationID allocationId) {
+ TaskManagerSlot taskManagerSlot = slots.get(slotId);
+
+ if (null != taskManagerSlot) {
+ if (taskManagerSlot.hasPendingSlotRequest() && Objects.equals(allocationId, taskManagerSlot.getAssignedSlotRequest().getAllocationId())) {
+ taskManagerSlot.setAssignedSlotRequest(null);
+ }
+
+ if (taskManagerSlot.isFree()) {
+ handleFreeSlot(taskManagerSlot);
+ }
+
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+ if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) {
+ registerTaskManagerTimeout(taskManagerRegistration);
+ }
+ } else {
+ LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId);
+ }
+ }
/**
- * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified.
+ * Handles a failed slot request. The slot manager tries to find a new slot fulfilling
+ * the resource requirements for the failed slot request.
*
- * @param offeredSlot The free slot
- * @param pendingRequests All the pending slot requests
- * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match
+ * @param slotId identifying the slot which was assigned to the slot request before
+ * @param allocationId identifying the failed slot request
+ * @param cause of the failure
*/
- protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot,
- final Map<AllocationID, SlotRequest> pendingRequests);
+ private void handleFailedSlotRequest(SlotID slotId, AllocationID allocationId, Throwable cause) {
+ PendingSlotRequest pendingSlotRequest = pendingSlotRequests.get(allocationId);
+
+ LOG.debug("Slot request with allocation id {} failed for slot {}.", allocationId, slotId, cause);
+
+ if (null != pendingSlotRequest) {
+ pendingSlotRequest.setRequestFuture(null);
+
+ try {
+ internalRequestSlot(pendingSlotRequest);
+ } catch (ResourceManagerException e) {
+ pendingSlotRequests.remove(allocationId);
- // ------------------------------------------------------------------------
- // Helper classes
- // ------------------------------------------------------------------------
+ resourceManagerActions.notifyAllocationFailure(
+ pendingSlotRequest.getJobId(),
+ allocationId,
+ e);
+ }
+ } else {
+ LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", allocationId);
+ }
+ }
/**
- * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info
- * either by SlotID or AllocationID.
+ * Rejects the pending slot request by failing the request future with a
+ * {@link SlotAllocationException}.
+ *
+ * @param pendingSlotRequest to reject
+ * @param cause of the rejection
*/
- private static class AllocationMap {
+ private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception cause) {
+ CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
- /** All allocated slots (by SlotID) */
- private final Map<SlotID, AllocationID> allocatedSlots;
+ if (null != request) {
+ request.completeExceptionally(new SlotAllocationException(cause));
+ } else {
+ LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", pendingSlotRequest.getAllocationId());
+ }
+ }
- /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */
- private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId;
+ /**
+ * Cancels the given slot request.
+ *
+ * @param pendingSlotRequest to cancel
+ */
+ private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
+ pendingSlotRequest.cancelTimeout();
- AllocationMap() {
- this.allocatedSlots = new HashMap<>(16);
- this.allocatedSlotsByAllocationId = new HashMap<>(16);
- }
+ CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
- /**
- * Add a allocation
- *
- * @param slotId The slot id
- * @param allocationId The allocation id
- */
- void addAllocation(final SlotID slotId, final AllocationID allocationId) {
- allocatedSlots.put(slotId, allocationId);
- allocatedSlotsByAllocationId.put(allocationId, slotId);
+ if (null != request) {
+ request.cancel(false);
}
+ }
+
+ // ---------------------------------------------------------------------------------------------
+ // Internal timeout methods
+ // ---------------------------------------------------------------------------------------------
+
+ private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) {
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId);
- /**
- * De-allocation with slot id
- *
- * @param slotId The slot id
- */
- void removeAllocation(final SlotID slotId) {
- if (allocatedSlots.containsKey(slotId)) {
- final AllocationID allocationId = allocatedSlots.get(slotId);
- allocatedSlots.remove(slotId);
- allocatedSlotsByAllocationId.remove(allocationId);
+ if (null != taskManagerRegistration) {
+ if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) {
+ if (anySlotUsed(taskManagerRegistration.getSlots())) {
+ LOG.debug("Cannot release the task manager with instance id {}, because some " +
+ "of its slots are still being used.", instanceId);
+ } else {
+ unregisterTaskManager(instanceId);
+
+ resourceManagerActions.releaseResource(instanceId);
+ }
+ } else {
+ taskManagerRegistrations.put(instanceId, taskManagerRegistration);
+
+ LOG.debug("Expected timeout identifier {} differs from the task manager's " +
+ "timeout identifier {}. Ignoring the task manager timeout call.",
+ timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier());
}
+ } else {
+ LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId);
}
+ }
- /**
- * De-allocation with allocation id
- *
- * @param allocationId The allocation id
- */
- void removeAllocation(final AllocationID allocationId) {
- if (allocatedSlotsByAllocationId.containsKey(allocationId)) {
- SlotID slotId = allocatedSlotsByAllocationId.get(allocationId);
- allocatedSlotsByAllocationId.remove(allocationId);
- allocatedSlots.remove(slotId);
+ private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) {
+ PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+ if (null != pendingSlotRequest) {
+ if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) {
+ if (!pendingSlotRequest.isAssigned()) {
+
+ resourceManagerActions.notifyAllocationFailure(
+ pendingSlotRequest.getJobId(),
+ allocationId,
+ new TimeoutException("The allocation could not be fulfilled in time."));
+ } else {
+ LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId);
+ }
+ } else {
+ pendingSlotRequests.put(allocationId, pendingSlotRequest);
+
+ LOG.debug("Expected timeout identifier {} differs from the pending slot request's " +
+ "timeout identifier {}. Ignoring the slot request timeout call.",
+ timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier());
}
+ } else {
+ LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId);
}
+ }
- /**
- * Check whether allocation exists by slot id
- *
- * @param slotId The slot id
- * @return true if the allocation exists
- */
- boolean isAllocated(final SlotID slotId) {
- return allocatedSlots.containsKey(slotId);
- }
+ // ---------------------------------------------------------------------------------------------
+ // Internal utility methods
+ // ---------------------------------------------------------------------------------------------
- /**
- * Check whether allocation exists by allocation id
- *
- * @param allocationId The allocation id
- * @return true if the allocation exists
- */
- boolean isAllocated(final AllocationID allocationId) {
- return allocatedSlotsByAllocationId.containsKey(allocationId);
- }
+ private boolean checkDuplicateRequest(AllocationID allocationId) {
+ return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId);
+ }
- AllocationID getAllocationID(final SlotID slotId) {
- return allocatedSlots.get(slotId);
- }
+ private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
- SlotID getSlotID(final AllocationID allocationId) {
- return allocatedSlotsByAllocationId.get(allocationId);
- }
+ if (null != slotsToCheck) {
+ boolean idle = true;
- public int size() {
- return allocatedSlots.size();
- }
+ for (SlotID slotId : slotsToCheck) {
+ TaskManagerSlot taskManagerSlot = slots.get(slotId);
- public void clear() {
- allocatedSlots.clear();
- allocatedSlotsByAllocationId.clear();
+ if (null != taskManagerSlot) {
+ idle &= taskManagerSlot.isFree();
+ }
+ }
+
+ return !idle;
+ } else {
+ return false;
}
}
- /**
- * Clears the state of the SlotManager after leadership revokal
- */
- public void clearState() {
- taskManagers.clear();
- registeredSlots.clear();
- pendingSlotRequests.clear();
- freeSlots.clear();
- allocationMap.clear();
- }
+ private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) {
+ final UUID timeoutIdentifier = UUID.randomUUID();
- // ------------------------------------------------------------------------
- // Testing utilities
- // ------------------------------------------------------------------------
+ ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ mainThreadExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
+ }
+ });
+ }
+ }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- @VisibleForTesting
- boolean isAllocated(final SlotID slotId) {
- return allocationMap.isAllocated(slotId);
+ taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier);
}
- @VisibleForTesting
- boolean isAllocated(final AllocationID allocationId) {
- return allocationMap.isAllocated(allocationId);
+ private void checkInit() {
+ Preconditions.checkState(started, "The slot manager has not been started.");
}
- /**
- * Add free slots directly to the free pool, this will not trigger pending requests allocation
- *
- * @param slot The resource slot
- */
- @VisibleForTesting
- void addFreeSlot(final ResourceSlot slot) {
- final ResourceID resourceId = slot.getResourceID();
- final SlotID slotId = slot.getSlotId();
+ // ---------------------------------------------------------------------------------------------
+ // Testing methods
+ // ---------------------------------------------------------------------------------------------
- if (!registeredSlots.containsKey(resourceId)) {
- registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
- }
- registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
- freeSlots.put(slotId, slot);
+ @VisibleForTesting
+ TaskManagerSlot getSlot(SlotID slotId) {
+ return slots.get(slotId);
}
@VisibleForTesting
- int getAllocatedSlotCount() {
- return allocationMap.size();
+ int getNumberRegisteredSlots() {
+ return slots.size();
}
@VisibleForTesting
- int getFreeSlotCount() {
- return freeSlots.size();
+ PendingSlotRequest getSlotRequest(AllocationID allocationId) {
+ return pendingSlotRequests.get(allocationId);
}
@VisibleForTesting
- int getPendingRequestCount() {
- return pendingSlotRequests.size();
+ boolean hasTimeoutRegistered(InstanceID instanceId) {
+ TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId);
+
+ if (null != taskManagerRegistration) {
+ return taskManagerRegistration.getTimeoutIdentifier() != null;
+ } else {
+ return false;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
new file mode 100644
index 0000000..d21c251
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+public class SlotManagerConfiguration {
+
+ private final Time taskManagerRequestTimeout;
+ private final Time slotRequestTimeout;
+ private final Time taskManagerTimeout;
+
+ public SlotManagerConfiguration(
+ Time taskManagerRequestTimeout,
+ Time slotRequestTimeout,
+ Time taskManagerTimeout) {
+ this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
+ this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout);
+ this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout);
+ }
+
+ public Time getTaskManagerRequestTimeout() {
+ return taskManagerRequestTimeout;
+ }
+
+ public Time getSlotRequestTimeout() {
+ return slotRequestTimeout;
+ }
+
+ public Time getTaskManagerTimeout() {
+ return taskManagerTimeout;
+ }
+
+ public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
+ ConfigOption<String> timeoutOption = ConfigOptions
+ .key(ConfigConstants.AKKA_ASK_TIMEOUT)
+ .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+ final String strTimeout = configuration.getString(timeoutOption);
+ final Time timeout;
+
+ try {
+ timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
+ } catch (NumberFormatException e) {
+ throw new ConfigurationException("Could not parse the resource manager's timeout " +
+ "value " + timeoutOption + '.', e);
+ }
+
+ return new SlotManagerConfiguration(timeout, timeout, timeout);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
new file mode 100644
index 0000000..c322c81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+public class SlotManagerException extends ResourceManagerException {
+
+ private static final long serialVersionUID = -3723028616920379071L;
+
+ public SlotManagerException(String message) {
+ super(message);
+ }
+
+ public SlotManagerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
deleted file mode 100644
index b4e9c99..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
-
-/**
- * Factory to create a SlotManager and provide it with dependencies.
- */
-public interface SlotManagerFactory {
-
- /**
- * Creates a SlotManager and provides it with ResourceManager services.
- */
- SlotManager create(ResourceManagerServices rmServices);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
new file mode 100644
index 0000000..8e23dbb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+public class TaskManagerRegistration {
+
+ private final TaskExecutorConnection taskManagerConnection;
+
+ private final HashSet<SlotID> slots;
+
+ private UUID timeoutIdentifier;
+
+ private ScheduledFuture<?> timeoutFuture;
+
+ public TaskManagerRegistration(TaskExecutorConnection taskManagerConnection) {
+ this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection);
+
+ slots = new HashSet<>(4);
+
+ timeoutIdentifier = null;
+ timeoutFuture = null;
+ }
+
+ public TaskExecutorConnection getTaskManagerConnection() {
+ return taskManagerConnection;
+ }
+
+ public InstanceID getInstanceId() {
+ return taskManagerConnection.getInstanceID();
+ }
+
+ public UUID getTimeoutIdentifier() {
+ return timeoutIdentifier;
+ }
+
+ public Set<SlotID> getSlots() {
+ return slots;
+ }
+
+ public boolean removeSlot(SlotID slotId) {
+ return slots.remove(slotId);
+ }
+
+ public void addSlot(SlotID slotId) {
+ slots.add(slotId);
+ }
+
+ public void cancelTimeout() {
+ if (null != timeoutFuture) {
+ timeoutFuture.cancel(false);
+
+ timeoutFuture = null;
+ timeoutIdentifier = null;
+ }
+ }
+
+ public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) {
+ cancelTimeout();
+
+ timeoutFuture = newTimeoutFuture;
+ timeoutIdentifier = newTimeoutIdentifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index 54adce6..7a9da28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -19,8 +19,9 @@
package org.apache.flink.runtime.taskexecutor;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,12 +30,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* which slots are available and allocated, and what jobs (JobManagers) the allocated slots
* have been allocated to.
*/
-public class SlotReport implements Serializable {
+public class SlotReport implements Serializable, Iterable<SlotStatus> {
private static final long serialVersionUID = -3150175198722481689L;
/** The slots status of the TaskManager */
- private final List<SlotStatus> slotsStatus;
+ private final Collection<SlotStatus> slotsStatus;
public SlotReport() {
this(Collections.<SlotStatus>emptyList());
@@ -44,12 +45,16 @@ public class SlotReport implements Serializable {
this(Collections.singletonList(slotStatus));
}
- public SlotReport(final List<SlotStatus> slotsStatus) {
+ public SlotReport(final Collection<SlotStatus> slotsStatus) {
this.slotsStatus = checkNotNull(slotsStatus);
}
- public List<SlotStatus> getSlotsStatus() {
+ public Collection<SlotStatus> getSlotsStatus() {
return slotsStatus;
}
+ @Override
+ public Iterator<SlotStatus> iterator() {
+ return slotsStatus.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 0f57bb1..a3bc4a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -38,7 +38,7 @@ public class SlotStatus implements Serializable {
private final SlotID slotID;
/** the resource profile of the slot */
- private final ResourceProfile profiler;
+ private final ResourceProfile resourceProfile;
/** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
private final AllocationID allocationID;
@@ -46,16 +46,17 @@ public class SlotStatus implements Serializable {
/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
private final JobID jobID;
- public SlotStatus(SlotID slotID, ResourceProfile profiler) {
- this(slotID, profiler, null, null);
+ public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
+ this(slotID, resourceProfile, null, null);
}
public SlotStatus(
- SlotID slotID, ResourceProfile profiler,
+ SlotID slotID,
+ ResourceProfile resourceProfile,
JobID jobID,
AllocationID allocationID) {
this.slotID = checkNotNull(slotID, "slotID cannot be null");
- this.profiler = checkNotNull(profiler, "profile cannot be null");
+ this.resourceProfile = checkNotNull(resourceProfile, "profile cannot be null");
this.allocationID = allocationID;
this.jobID = jobID;
}
@@ -74,8 +75,8 @@ public class SlotStatus implements Serializable {
*
* @return The resource profile
*/
- public ResourceProfile getProfiler() {
- return profiler;
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
}
/**
@@ -110,7 +111,7 @@ public class SlotStatus implements Serializable {
if (!slotID.equals(that.slotID)) {
return false;
}
- if (!profiler.equals(that.profiler)) {
+ if (!resourceProfile.equals(that.resourceProfile)) {
return false;
}
if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
@@ -123,7 +124,7 @@ public class SlotStatus implements Serializable {
@Override
public int hashCode() {
int result = slotID.hashCode();
- result = 31 * result + profiler.hashCode();
+ result = 31 * result + resourceProfile.hashCode();
result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d04cabb..5b8c8ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -61,8 +61,6 @@ import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -71,6 +69,7 @@ import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
@@ -99,6 +98,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -315,7 +315,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
throw new TaskSubmissionException(message);
}
- if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+ if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
final String message = "Rejecting the task submission because the job manager leader id " +
jobManagerLeaderId + " does not match the expected job manager leader id " +
jobManagerConnection.getLeaderId() + '.';
@@ -355,30 +355,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
Task task = new Task(
- jobInformation,
- taskInformation,
- tdd.getExecutionAttemptId(),
- tdd.getAllocationId(),
- tdd.getSubtaskIndex(),
- tdd.getAttemptNumber(),
- tdd.getProducedPartitions(),
- tdd.getInputGates(),
- tdd.getTargetSlotNumber(),
- tdd.getTaskStateHandles(),
- memoryManager,
- ioManager,
- networkEnvironment,
- broadcastVariableManager,
- taskManagerActions,
- inputSplitProvider,
- checkpointResponder,
- libraryCache,
- fileCache,
- taskManagerConfiguration,
- taskMetricGroup,
- resultPartitionConsumableNotifier,
- partitionStateChecker,
- getRpcService().getExecutor());
+ jobInformation,
+ taskInformation,
+ tdd.getExecutionAttemptId(),
+ tdd.getAllocationId(),
+ tdd.getSubtaskIndex(),
+ tdd.getAttemptNumber(),
+ tdd.getProducedPartitions(),
+ tdd.getInputGates(),
+ tdd.getTargetSlotNumber(),
+ tdd.getTaskStateHandles(),
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ broadcastVariableManager,
+ taskManagerActions,
+ inputSplitProvider,
+ checkpointResponder,
+ libraryCache,
+ fileCache,
+ taskManagerConfiguration,
+ taskMetricGroup,
+ resultPartitionConsumableNotifier,
+ partitionStateChecker,
+ getRpcService().getExecutor());
log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
@@ -561,7 +561,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ----------------------------------------------------------------------
/**
- * /**
* Requests a slot from the TaskManager
*
* @param slotId identifying the requested slot
@@ -573,12 +572,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
* @return answer to the slot request
*/
@RpcMethod
- public TMSlotRequestReply requestSlot(
+ public Acknowledge requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final String targetAddress,
final UUID rmLeaderId) throws SlotAllocationException {
+ // TODO: Filter invalid requests from the resource manager by using the instance/registration Id
+
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, rmLeaderId);
@@ -608,7 +609,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
- throw new SlotAllocationException(message);
+
+ throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
}
if (jobManagerTable.contains(jobId)) {
@@ -635,7 +637,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
+ return Acknowledge.get();
}
// ----------------------------------------------------------------------
@@ -1046,7 +1048,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
resourceManagerGateway.notifySlotAvailable(
resourceManagerConnection.getTargetLeaderId(),
resourceManagerConnection.getRegistrationId(),
- new SlotID(getResourceID(), freedSlotIndex));
+ new SlotID(getResourceID(), freedSlotIndex),
+ allocationId);
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1079,7 +1082,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) {
JobManagerConnection jmConnection = jobManagerTable.get(jobId);
- return jmConnection != null && jmConnection.getLeaderId().equals(leaderId);
+ return jmConnection != null && Objects.equals(jmConnection.getLeaderId(), leaderId);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2bbf0e6..bedf8ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -51,7 +50,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @throws SlotAllocationException if the slot allocation fails
* @return answer to the slot request
*/
- Future<TMSlotRequestReply> requestSlot(
+ Future<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
new file mode 100644
index 0000000..93e67a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.util.Preconditions;
+
+public class SlotOccupiedException extends SlotAllocationException {
+ private static final long serialVersionUID = -3986333914244338888L;
+
+ private final AllocationID allocationId;
+
+ public SlotOccupiedException(String message, AllocationID allocationId) {
+ super(message);
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ }
+
+ public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) {
+ super(message, cause);
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ }
+
+ public SlotOccupiedException(Throwable cause, AllocationID allocationId) {
+ super(cause);
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ }
+
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 081d8f8..5c51c7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -509,6 +509,16 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
return new TaskIterator(jobId);
}
+ /**
+ * Get the current allocation for the task slot with the given index.
+ *
+ * @param index identifying the slot for which the allocation id shall be retrieved
+ * @return Allocation id of the specified slot if allocated; otherwise null
+ */
+ public AllocationID getCurrentAllocation(int index) {
+ return taskSlots.get(index).getAllocationId();
+ }
+
// ---------------------------------------------------------------------
// TimeoutListener methods
// ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e4e20b9..41c2e16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -391,10 +391,14 @@ public class ResourceManagerTest extends TestLogger {
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
final MetricRegistry metricRegistry = mock(MetricRegistry.class);
final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ final SlotManager slotManager = new SlotManager(
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
try {
final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
@@ -404,7 +408,7 @@ public class ResourceManagerTest extends TestLogger {
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
- slotManagerFactory,
+ slotManager,
metricRegistry,
jobLeaderIdService,
testingFatalErrorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 89fd22f..b444640 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -80,7 +80,7 @@ public class SlotPoolRpcTest {
Time.days(1), Time.days(1),
Time.milliseconds(100) // this is the timeout for the request tested here
);
- pool.start(UUID.randomUUID());
+ pool.start(UUID.randomUUID(), "foobar");
Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index b4149b2..cf95461 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -78,7 +78,9 @@ public class SlotPoolTest extends TestLogger {
mainThreadValidatorUtil.enterMainThread();
- slotPool.start(UUID.randomUUID());
+ final String jobManagerAddress = "foobar";
+
+ slotPool.start(UUID.randomUUID(), jobManagerAddress);
this.resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 49bc570..986f848 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -25,9 +25,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
@@ -57,13 +58,17 @@ public class ResourceManagerHATest extends TestLogger {
Time.seconds(5L),
Time.seconds(5L));
- ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L));
+ ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(
+ Time.seconds(5L),
+ new SlotManagerConfiguration(
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime()));
ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());
- SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
MetricRegistry metricRegistry = mock(MetricRegistry.class);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -76,7 +81,7 @@ public class ResourceManagerHATest extends TestLogger {
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
- slotManagerFactory,
+ resourceManagerRuntimeServices.getSlotManager(),
metricRegistry,
resourceManagerRuntimeServices.getJobLeaderIdService(),
testingFatalErrorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 6a151ac..4836f74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -31,10 +31,11 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -240,12 +241,17 @@ public class ResourceManagerJobMasterTest extends TestLogger {
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
Time.seconds(5L),
Time.seconds(5L));
- SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
MetricRegistry metricRegistry = mock(MetricRegistry.class);
JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
rpcService.getScheduledExecutor(),
Time.minutes(5L));
+
+ final SlotManager slotManager = new SlotManager(
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime(),
+ TestingUtils.infiniteTime());
ResourceManager resourceManager = new StandaloneResourceManager(
rpcService,
@@ -254,7 +260,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
- slotManagerFactory,
+ slotManager,
metricRegistry,
jobLeaderIdService,
fatalErrorHandler);