You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 16:42:27 UTC
[10/11] flink git commit: [FLINK-7956] [flip6] Add support for queued
scheduling with slot sharing to SlotPool
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
deleted file mode 100644
index 68f5be6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ /dev/null
@@ -1,1219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
-import org.apache.flink.runtime.jobmanager.slots.SlotException;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.clock.Clock;
-import org.apache.flink.runtime.util.clock.SystemClock;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It will will attempt to acquire new slots
- * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available,
- * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also
- * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the
- * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running
- * but we still have some free slots.
- * <p>
- * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
- * eliminate ambiguities.
- *
- * TODO : Make pending requests location preference aware
- * TODO : Make pass location preferences to ResourceManager when sending a slot request
- */
-public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
-
- /** The log for the pool - shared also with the internal classes */
- static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
-
- // ------------------------------------------------------------------------
-
- private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
-
- private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
-
- private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
-
- // ------------------------------------------------------------------------
-
- private final JobID jobId;
-
- private final ProviderAndOwner providerAndOwner;
-
- /** All registered TaskManagers, slots will be accepted and used only if the resource is registered */
- private final HashSet<ResourceID> registeredTaskManagers;
-
- /** The book-keeping of all allocated slots */
- private final AllocatedSlots allocatedSlots;
-
- /** The book-keeping of all available slots */
- private final AvailableSlots availableSlots;
-
- /** All pending requests waiting for slots */
- private final DualKeyMap<SlotRequestID, AllocationID, PendingRequest> pendingRequests;
-
- /** The requests that are waiting for the resource manager to be connected */
- private final HashMap<SlotRequestID, PendingRequest> waitingForResourceManager;
-
- /** Timeout for request calls to the ResourceManager */
- private final Time resourceManagerRequestsTimeout;
-
- /** Timeout for allocation round trips (RM -> launch TM -> offer slot) */
- private final Time resourceManagerAllocationTimeout;
-
- private final Clock clock;
-
- /** the fencing token of the job manager */
- private JobMasterId jobMasterId;
-
- /** The gateway to communicate with resource manager */
- private ResourceManagerGateway resourceManagerGateway;
-
- private String jobManagerAddress;
-
- // ------------------------------------------------------------------------
-
- public SlotPool(RpcService rpcService, JobID jobId) {
- this(rpcService, jobId, SystemClock.getInstance(),
- DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
- }
-
- public SlotPool(
- RpcService rpcService,
- JobID jobId,
- Clock clock,
- Time slotRequestTimeout,
- Time resourceManagerAllocationTimeout,
- Time resourceManagerRequestTimeout) {
-
- super(rpcService);
-
- this.jobId = checkNotNull(jobId);
- this.clock = checkNotNull(clock);
- this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
- this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
-
- this.registeredTaskManagers = new HashSet<>();
- this.allocatedSlots = new AllocatedSlots();
- this.availableSlots = new AvailableSlots();
- this.pendingRequests = new DualKeyMap<>(16);
- this.waitingForResourceManager = new HashMap<>(16);
-
- this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
-
- this.jobMasterId = null;
- this.resourceManagerGateway = null;
- this.jobManagerAddress = null;
- }
-
- // ------------------------------------------------------------------------
- // Starting and Stopping
- // ------------------------------------------------------------------------
-
- @Override
- public void start() {
- throw new UnsupportedOperationException("Should never call start() without leader ID");
- }
-
- /**
- * Start the slot pool to accept RPC calls.
- *
- * @param jobMasterId The necessary leader id for running the job.
- * @param newJobManagerAddress for the slot requests which are sent to the resource manager
- */
- public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
- this.jobMasterId = checkNotNull(jobMasterId);
- this.jobManagerAddress = checkNotNull(newJobManagerAddress);
-
- // TODO - start should not throw an exception
- try {
- super.start();
- } catch (Exception e) {
- throw new RuntimeException("This should never happen", e);
- }
- }
-
- /**
- * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
- */
- @Override
- public void suspend() {
- validateRunsInMainThread();
-
- // suspend this RPC endpoint
- stop();
-
- // do not accept any requests
- jobMasterId = null;
- resourceManagerGateway = null;
-
- // Clear (but not release!) the available slots. The TaskManagers should re-register them
- // at the new leader JobManager/SlotPool
- availableSlots.clear();
- allocatedSlots.clear();
- pendingRequests.clear();
- }
-
- // ------------------------------------------------------------------------
- // Getting PoolOwner and PoolProvider
- // ------------------------------------------------------------------------
-
- /**
- * Gets the slot owner implementation for this pool.
- *
- * <p>This method does not mutate state and can be called directly (no RPC indirection)
- *
- * @return The slot owner implementation for this pool.
- */
- public SlotOwner getSlotOwner() {
- return providerAndOwner;
- }
-
- /**
- * Gets the slot provider implementation for this pool.
- *
- * <p>This method does not mutate state and can be called directly (no RPC indirection)
- *
- * @return The slot provider implementation for this pool.
- */
- public SlotProvider getSlotProvider() {
- return providerAndOwner;
- }
-
- // ------------------------------------------------------------------------
- // Resource Manager Connection
- // ------------------------------------------------------------------------
-
- @Override
- public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
- this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
-
- // work on all slots waiting for this connection
- for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
- requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
- }
-
- // all sent off
- waitingForResourceManager.clear();
- }
-
- @Override
- public void disconnectResourceManager() {
- this.resourceManagerGateway = null;
- }
-
- // ------------------------------------------------------------------------
- // Slot Allocation
- // ------------------------------------------------------------------------
-
- @Override
- public CompletableFuture<LogicalSlot> allocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences,
- Time timeout) {
-
- return internalAllocateSlot(requestId, task, resources, locationPreferences);
- }
-
- @Override
- public void returnAllocatedSlot(SlotRequestID slotRequestId) {
- final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
-
- if (allocatedSlot != null) {
- if (allocatedSlot.releaseLogicalSlot()) {
- tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
- } else {
- throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.');
- }
- } else {
- log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
- }
- }
-
- @Override
- public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId) {
- final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
-
- if (pendingRequest != null) {
- failPendingRequest(pendingRequest, new CancellationException("Allocation with request id" + slotRequestId + " cancelled."));
- } else {
- final AllocatedSlot allocatedSlot = allocatedSlots.get(slotRequestId);
-
- if (allocatedSlot != null) {
- LOG.info("Returning allocated slot {} because the corresponding allocation request {} was cancelled.", allocatedSlot, slotRequestId);
- // TODO: Avoid having to send another message to do the slot releasing (e.g. introduce Slot#cancelExecution) and directly return slot
- allocatedSlot.triggerLogicalSlotRelease();
- } else {
- LOG.debug("There was no slot allocation with {} to be cancelled.", slotRequestId);
- }
- }
-
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
-
- CompletableFuture<LogicalSlot> internalAllocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences) {
-
- // (1) do we have a slot available already?
- SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
- if (slotFromPool != null) {
- final AllocatedSlot allocatedSlot = slotFromPool.slot();
-
- final SimpleSlot simpleSlot;
- try {
- simpleSlot = allocatedSlot.allocateSimpleSlot(requestId, slotFromPool.locality());
- } catch (SlotException e) {
- availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-
- return FutureUtils.completedExceptionally(e);
- }
-
- allocatedSlots.add(requestId, allocatedSlot);
- return CompletableFuture.completedFuture(simpleSlot);
- }
-
- // we have to request a new allocated slot
- CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestSlot(
- requestId,
- resources);
-
- return allocatedSlotFuture.thenApply(
- (AllocatedSlot allocatedSlot) -> {
- try {
- return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
- } catch (SlotException e) {
- throw new CompletionException("Could not allocate a logical simple slot from allocate slot " +
- allocatedSlot + '.', e);
- }
- });
- }
-
- /**
- * Checks whether there exists a pending request with the given allocation id and removes it
- * from the internal data structures.
- *
- * @param requestId identifying the pending request
- * @return pending request if there is one, otherwise null
- */
- @Nullable
- private PendingRequest removePendingRequest(SlotRequestID requestId) {
- PendingRequest result = waitingForResourceManager.remove(requestId);
-
- if (result != null) {
- // sanity check
- assert !pendingRequests.containsKeyA(requestId) : "A pending requests should only be part of either " +
- "the pendingRequests or waitingForResourceManager but not both.";
-
- return result;
- } else {
- return pendingRequests.removeKeyA(requestId);
- }
- }
-
- private CompletableFuture<AllocatedSlot> requestSlot(
- SlotRequestID slotRequestId,
- ResourceProfile resourceProfile) {
-
- final PendingRequest pendingRequest = new PendingRequest(
- slotRequestId,
- resourceProfile);
-
- if (resourceManagerGateway == null) {
- stashRequestWaitingForResourceManager(pendingRequest);
- } else {
- requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
- }
-
- return pendingRequest.getAllocatedSlotFuture();
- }
-
- private void requestSlotFromResourceManager(
- final ResourceManagerGateway resourceManagerGateway,
- final PendingRequest pendingRequest) {
-
- Preconditions.checkNotNull(resourceManagerGateway);
- Preconditions.checkNotNull(pendingRequest);
-
- LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
-
- final AllocationID allocationId = new AllocationID();
-
- pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
-
- pendingRequest.getAllocatedSlotFuture().whenComplete(
- (value, throwable) -> {
- if (throwable != null) {
- resourceManagerGateway.cancelSlotRequest(allocationId);
- }
- });
-
- CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
- jobMasterId,
- new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
- resourceManagerRequestsTimeout);
-
- CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
- (Acknowledge value) -> {
- slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId());
- },
- getMainThreadExecutor());
-
- // on failure, fail the request future
- slotRequestProcessingFuture.whenCompleteAsync(
- (Void v, Throwable failure) -> {
- if (failure != null) {
- slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
- }
- },
- getMainThreadExecutor());
- }
-
- private void slotRequestToResourceManagerSuccess(final SlotRequestID requestId) {
- // a request is pending from the ResourceManager to a (future) TaskManager
- // we only add the watcher here in case that request times out
- scheduleRunAsync(new Runnable() {
- @Override
- public void run() {
- checkTimeoutSlotAllocation(requestId);
- }
- }, resourceManagerAllocationTimeout);
- }
-
- private void slotRequestToResourceManagerFailed(SlotRequestID slotRequestID, Throwable failure) {
- PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
- if (request != null) {
- request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
- "No pooled slot available and request to ResourceManager for new slot failed", failure));
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
- }
- }
- }
-
- private void checkTimeoutSlotAllocation(SlotRequestID slotRequestID) {
- PendingRequest request = pendingRequests.removeKeyA(slotRequestID);
- if (request != null) {
- failPendingRequest(request, new TimeoutException("Slot allocation request " + slotRequestID + " timed out"));
- }
- }
-
- private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
- Preconditions.checkNotNull(pendingRequest);
- Preconditions.checkNotNull(e);
-
- if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
- LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
- pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
- }
- }
-
- private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
-
- LOG.info("Cannot serve slot request, no ResourceManager connected. " +
- "Adding as pending request {}", pendingRequest.getSlotRequestId());
-
- waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
-
- scheduleRunAsync(new Runnable() {
- @Override
- public void run() {
- checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
- }
- }, resourceManagerRequestsTimeout);
- }
-
- private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequestId) {
- PendingRequest request = waitingForResourceManager.remove(slotRequestId);
- if (request != null) {
- failPendingRequest(
- request,
- new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
- }
- }
-
- // ------------------------------------------------------------------------
- // Slot releasing & offering
- // ------------------------------------------------------------------------
-
- /**
- * Tries to fulfill with the given allocated slot a pending slot request or add the
- * allocated slot to the set of available slots if no matching request is available.
- *
- * @param allocatedSlot which shall be returned
- */
- private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
- Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
-
- final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
-
- if (pendingRequest != null) {
- LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
- pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
-
- allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
- pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
- } else {
- LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
- availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
- }
- }
-
- private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
- final ResourceProfile slotResources = slot.getResourceProfile();
-
- // try the requests sent to the resource manager first
- for (PendingRequest request : pendingRequests.values()) {
- if (slotResources.isMatching(request.getResourceProfile())) {
- pendingRequests.removeKeyA(request.getSlotRequestId());
- return request;
- }
- }
-
- // try the requests waiting for a resource manager connection next
- for (PendingRequest request : waitingForResourceManager.values()) {
- if (slotResources.isMatching(request.getResourceProfile())) {
- waitingForResourceManager.remove(request.getSlotRequestId());
- return request;
- }
- }
-
- // no request pending, or no request matches
- return null;
- }
-
- @Override
- public CompletableFuture<Collection<SlotOffer>> offerSlots(
- TaskManagerLocation taskManagerLocation,
- TaskManagerGateway taskManagerGateway,
- Collection<SlotOffer> offers) {
- validateRunsInMainThread();
-
- List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
- offer -> {
- CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(
- taskManagerLocation,
- taskManagerGateway,
- offer)
- .thenApply(
- (acceptedSlot) -> {
- if (acceptedSlot) {
- return Optional.of(offer);
- } else {
- return Optional.empty();
- }
- });
-
- return acceptedSlotOffer;
- }
- ).collect(Collectors.toList());
-
- CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);
-
- CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = optionalSlotOffers.thenApply(
- collection -> {
- Collection<SlotOffer> slotOffers = collection
- .stream()
- .flatMap(
- opt -> opt.map(Stream::of).orElseGet(Stream::empty))
- .collect(Collectors.toList());
-
- return slotOffers;
- });
-
- return resultingSlotOffers;
- }
-
- /**
- * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
- * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
- * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
- * request waiting for this slot (maybe fulfilled by some other returned slot).
- *
- * @param taskManagerLocation location from where the offer comes from
- * @param taskManagerGateway TaskManager gateway
- * @param slotOffer the offered slot
- * @return True if we accept the offering
- */
- @Override
- public CompletableFuture<Boolean> offerSlot(
- final TaskManagerLocation taskManagerLocation,
- final TaskManagerGateway taskManagerGateway,
- final SlotOffer slotOffer) {
- validateRunsInMainThread();
-
- // check if this TaskManager is valid
- final ResourceID resourceID = taskManagerLocation.getResourceID();
- final AllocationID allocationID = slotOffer.getAllocationId();
-
- if (!registeredTaskManagers.contains(resourceID)) {
- LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
- slotOffer.getAllocationId(), taskManagerLocation);
- return CompletableFuture.completedFuture(false);
- }
-
- // check whether we have already using this slot
- if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
- LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
-
- // return true here so that the sender will get a positive acknowledgement to the retry
- // and mark the offering as a success
- return CompletableFuture.completedFuture(true);
- }
-
- final AllocatedSlot allocatedSlot = new AllocatedSlot(
- slotOffer.getAllocationId(),
- taskManagerLocation,
- slotOffer.getSlotIndex(),
- slotOffer.getResourceProfile(),
- taskManagerGateway,
- providerAndOwner);
-
- // check whether we have request waiting for this slot
- PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
- if (pendingRequest != null) {
- // we were waiting for this!
- allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
- pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
- }
- else {
- // we were actually not waiting for this:
- // - could be that this request had been fulfilled
- // - we are receiving the slots from TaskManagers after becoming leaders
- tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
- }
-
- // we accepted the request in any case. slot will be released after it idled for
- // too long and timed out
- return CompletableFuture.completedFuture(true);
- }
-
-
- // TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active)
-
- // TODO - release slots that were not used to the resource manager
-
- // ------------------------------------------------------------------------
- // Error Handling
- // ------------------------------------------------------------------------
-
- /**
- * Fail the specified allocation and release the corresponding slot if we have one.
- * This may triggered by JobManager when some slot allocation failed with timeout.
- * Or this could be triggered by TaskManager, when it finds out something went wrong with the slot,
- * and decided to take it back.
- *
- * @param allocationID Represents the allocation which should be failed
- * @param cause The cause of the failure
- */
- @Override
- public void failAllocation(final AllocationID allocationID, final Exception cause) {
- final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
- if (pendingRequest != null) {
- // request was still pending
- failPendingRequest(pendingRequest, cause);
- }
- else if (availableSlots.tryRemove(allocationID)) {
- LOG.debug("Failed available slot [{}] with ", allocationID, cause);
- }
- else {
- AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
- if (allocatedSlot != null) {
- // release the slot.
- // since it is not in 'allocatedSlots' any more, it will be dropped o return'
- allocatedSlot.triggerLogicalSlotRelease();
- }
- else {
- LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
- }
- }
- // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
- }
-
- // ------------------------------------------------------------------------
- // Resource
- // ------------------------------------------------------------------------
-
- /**
- * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
- * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
- *
- * @param resourceID The id of the TaskManager
- * @return Future acknowledge if th operation was successful
- */
- @Override
- public CompletableFuture<Acknowledge> registerTaskManager(final ResourceID resourceID) {
- registeredTaskManagers.add(resourceID);
-
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
-
- /**
- * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
- * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
- *
- * @param resourceID The id of the TaskManager
- */
- @Override
- public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
- if (registeredTaskManagers.remove(resourceID)) {
- availableSlots.removeAllForTaskManager(resourceID);
-
- final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
- for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
- allocatedSlot.triggerLogicalSlotRelease();
- // TODO: This is a work-around to mark the logical slot as released. We should split up the internalReturnSlot method to not poll pending requests
- allocatedSlot.releaseLogicalSlot();
- }
- }
-
- return CompletableFuture.completedFuture(Acknowledge.get());
- }
-
- // ------------------------------------------------------------------------
- // Methods for tests
- // ------------------------------------------------------------------------
-
- @VisibleForTesting
- AllocatedSlots getAllocatedSlots() {
- return allocatedSlots;
- }
-
- @VisibleForTesting
- AvailableSlots getAvailableSlots() {
- return availableSlots;
- }
-
- @VisibleForTesting
- DualKeyMap<SlotRequestID, AllocationID, PendingRequest> getPendingRequests() {
- return pendingRequests;
- }
-
- @VisibleForTesting
- Map<SlotRequestID, PendingRequest> getWaitingForResourceManager() {
- return waitingForResourceManager;
- }
-
- // ------------------------------------------------------------------------
- // Helper classes
- // ------------------------------------------------------------------------
-
- /**
- * Organize allocated slots from different points of view.
- */
- static class AllocatedSlots {
-
- /** All allocated slots organized by TaskManager's id */
- private final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsByTaskManager;
-
- /** All allocated slots organized by AllocationID */
- private final DualKeyMap<AllocationID, SlotRequestID, AllocatedSlot> allocatedSlotsById;
-
- AllocatedSlots() {
- this.allocatedSlotsByTaskManager = new HashMap<>(16);
- this.allocatedSlotsById = new DualKeyMap<>(16);
- }
-
- /**
- * Adds a new slot to this collection.
- *
- * @param allocatedSlot The allocated slot
- */
- void add(SlotRequestID slotRequestId, AllocatedSlot allocatedSlot) {
- allocatedSlotsById.put(allocatedSlot.getAllocationId(), slotRequestId, allocatedSlot);
-
- final ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
-
- Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.computeIfAbsent(
- resourceID,
- resourceId -> new HashSet<>(4));
-
- slotsForTaskManager.add(allocatedSlot);
- }
-
- /**
- * Get allocated slot with allocation id
- *
- * @param allocationID The allocation id
- * @return The allocated slot, null if we can't find a match
- */
- AllocatedSlot get(final AllocationID allocationID) {
- return allocatedSlotsById.getKeyA(allocationID);
- }
-
- AllocatedSlot get(final SlotRequestID slotRequestId) {
- return allocatedSlotsById.getKeyB(slotRequestId);
- }
-
- /**
- * Check whether we have allocated this slot
- *
- * @param slotAllocationId The allocation id of the slot to check
- * @return True if we contains this slot
- */
- boolean contains(AllocationID slotAllocationId) {
- return allocatedSlotsById.containsKeyA(slotAllocationId);
- }
-
- /**
- * Removes the allocated slot specified by the provided slot allocation id.
- *
- * @param allocationID identifying the allocated slot to remove
- * @return The removed allocated slot or null.
- */
- @Nullable
- AllocatedSlot remove(final AllocationID allocationID) {
- AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyA(allocationID);
-
- if (allocatedSlot != null) {
- removeAllocatedSlot(allocatedSlot);
- }
-
- return allocatedSlot;
- }
-
- /**
- * Removes the allocated slot specified by the provided slot request id.
- *
- * @param slotRequestId identifying the allocated slot to remove
- * @return The removed allocated slot or null.
- */
- @Nullable
- AllocatedSlot remove(final SlotRequestID slotRequestId) {
- final AllocatedSlot allocatedSlot = allocatedSlotsById.removeKeyB(slotRequestId);
-
- if (allocatedSlot != null) {
- removeAllocatedSlot(allocatedSlot);
- }
-
- return allocatedSlot;
- }
-
- private void removeAllocatedSlot(final AllocatedSlot allocatedSlot) {
- Preconditions.checkNotNull(allocatedSlot);
- final ResourceID taskManagerId = allocatedSlot.getTaskManagerLocation().getResourceID();
- Set<AllocatedSlot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
-
- slotsForTM.remove(allocatedSlot);
-
- if (slotsForTM.isEmpty()) {
- allocatedSlotsByTaskManager.remove(taskManagerId);
- }
- }
-
- /**
- * Get all allocated slot from same TaskManager.
- *
- * @param resourceID The id of the TaskManager
- * @return Set of slots which are allocated from the same TaskManager
- */
- Set<AllocatedSlot> removeSlotsForTaskManager(final ResourceID resourceID) {
- Set<AllocatedSlot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
- if (slotsForTaskManager != null) {
- for (AllocatedSlot allocatedSlot : slotsForTaskManager) {
- allocatedSlotsById.removeKeyA(allocatedSlot.getAllocationId());
- }
- return slotsForTaskManager;
- }
- else {
- return Collections.emptySet();
- }
- }
-
- void clear() {
- allocatedSlotsById.clear();
- allocatedSlotsByTaskManager.clear();
- }
-
- @VisibleForTesting
- boolean containResource(final ResourceID resourceID) {
- return allocatedSlotsByTaskManager.containsKey(resourceID);
- }
-
- @VisibleForTesting
- int size() {
- return allocatedSlotsById.size();
- }
-
- @VisibleForTesting
- Set<AllocatedSlot> getSlotsForTaskManager(ResourceID resourceId) {
- if (allocatedSlotsByTaskManager.containsKey(resourceId)) {
- return allocatedSlotsByTaskManager.get(resourceId);
- } else {
- return Collections.emptySet();
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Organize all available slots from different points of view.
- */
- static class AvailableSlots {
-
- /** All available slots organized by TaskManager */
- private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;
-
- /** All available slots organized by host */
- private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost;
-
- /** The available slots, with the time when they were inserted */
- private final HashMap<AllocationID, SlotAndTimestamp> availableSlots;
-
- AvailableSlots() {
- this.availableSlotsByTaskManager = new HashMap<>();
- this.availableSlotsByHost = new HashMap<>();
- this.availableSlots = new HashMap<>();
- }
-
- /**
- * Adds an available slot.
- *
- * @param slot The slot to add
- */
- void add(final AllocatedSlot slot, final long timestamp) {
- checkNotNull(slot);
-
- SlotAndTimestamp previous = availableSlots.put(
- slot.getAllocationId(), new SlotAndTimestamp(slot, timestamp));
-
- if (previous == null) {
- final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
- final String host = slot.getTaskManagerLocation().getFQDNHostname();
-
- Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
- if (slotsForTaskManager == null) {
- slotsForTaskManager = new HashSet<>();
- availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
- }
- slotsForTaskManager.add(slot);
-
- Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
- if (slotsForHost == null) {
- slotsForHost = new HashSet<>();
- availableSlotsByHost.put(host, slotsForHost);
- }
- slotsForHost.add(slot);
- }
- else {
- throw new IllegalStateException("slot already contained");
- }
- }
-
- /**
- * Check whether we have this slot.
- */
- boolean contains(AllocationID slotId) {
- return availableSlots.containsKey(slotId);
- }
-
- /**
- * Poll a slot which matches the required resource profile. The polling tries to satisfy the
- * location preferences, by TaskManager and by host.
- *
- * @param resourceProfile The required resource profile.
- * @param locationPreferences The location preferences, in order to be checked.
- *
- * @return Slot which matches the resource profile, null if we can't find a match
- */
- SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> locationPreferences) {
- // fast path if no slots are available
- if (availableSlots.isEmpty()) {
- return null;
- }
-
- boolean hadLocationPreference = false;
-
- if (locationPreferences != null) {
-
- // first search by TaskManager
- for (TaskManagerLocation location : locationPreferences) {
- hadLocationPreference = true;
-
- final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
- if (onTaskManager != null) {
- for (AllocatedSlot candidate : onTaskManager) {
- if (candidate.getResourceProfile().isMatching(resourceProfile)) {
- remove(candidate.getAllocationId());
- return new SlotAndLocality(candidate, Locality.LOCAL);
- }
- }
- }
- }
-
- // now, search by host
- for (TaskManagerLocation location : locationPreferences) {
- final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
- if (onHost != null) {
- for (AllocatedSlot candidate : onHost) {
- if (candidate.getResourceProfile().isMatching(resourceProfile)) {
- remove(candidate.getAllocationId());
- return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
- }
- }
- }
- }
- }
-
- // take any slot
- for (SlotAndTimestamp candidate : availableSlots.values()) {
- final AllocatedSlot slot = candidate.slot();
-
- if (slot.getResourceProfile().isMatching(resourceProfile)) {
- remove(slot.getAllocationId());
- return new SlotAndLocality(
- slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
- }
- }
-
- // nothing available that matches
- return null;
- }
-
- /**
- * Remove all available slots come from specified TaskManager.
- *
- * @param taskManager The id of the TaskManager
- */
- void removeAllForTaskManager(final ResourceID taskManager) {
- // remove from the by-TaskManager view
- final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
-
- if (slotsForTm != null && slotsForTm.size() > 0) {
- final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
- final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-
- // remove from the base set and the by-host view
- for (AllocatedSlot slot : slotsForTm) {
- availableSlots.remove(slot.getAllocationId());
- slotsForHost.remove(slot);
- }
-
- if (slotsForHost.isEmpty()) {
- availableSlotsByHost.remove(host);
- }
- }
- }
-
- boolean tryRemove(AllocationID slotId) {
- final SlotAndTimestamp sat = availableSlots.remove(slotId);
- if (sat != null) {
- final AllocatedSlot slot = sat.slot();
- final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
- final String host = slot.getTaskManagerLocation().getFQDNHostname();
-
- final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID);
- final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
-
- slotsForTm.remove(slot);
- slotsForHost.remove(slot);
-
- if (slotsForTm.isEmpty()) {
- availableSlotsByTaskManager.remove(resourceID);
- }
- if (slotsForHost.isEmpty()) {
- availableSlotsByHost.remove(host);
- }
-
- return true;
- }
- else {
- return false;
- }
- }
-
- private void remove(AllocationID slotId) throws IllegalStateException {
- if (!tryRemove(slotId)) {
- throw new IllegalStateException("slot not contained");
- }
- }
-
- @VisibleForTesting
- boolean containsTaskManager(ResourceID resourceID) {
- return availableSlotsByTaskManager.containsKey(resourceID);
- }
-
- @VisibleForTesting
- int size() {
- return availableSlots.size();
- }
-
- @VisibleForTesting
- void clear() {
- availableSlots.clear();
- availableSlotsByTaskManager.clear();
- availableSlotsByHost.clear();
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
- * that delegates methods as RPC calls to the SlotPool's RPC gateway.
- */
- private static class ProviderAndOwner implements SlotOwner, SlotProvider {
-
- private final SlotPoolGateway gateway;
-
- private final Time timeout;
-
- ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
- this.gateway = gateway;
- this.timeout = timeout;
- }
-
- @Override
- public CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot slot) {
- gateway.returnAllocatedSlot(slot.getSlotRequestId());
- return CompletableFuture.completedFuture(true);
- }
-
- @Override
- public CompletableFuture<LogicalSlot> allocateSlot(
- ScheduledUnit task,
- boolean allowQueued,
- Collection<TaskManagerLocation> preferredLocations) {
-
- final SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
- slotFuture.whenComplete(
- (LogicalSlot slot, Throwable failure) -> {
- if (failure != null) {
- gateway.cancelSlotRequest(requestId);
- }
- });
- return slotFuture;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A pending request for a slot
- */
- private static class PendingRequest {
-
- private final SlotRequestID slotRequestId;
-
- private final ResourceProfile resourceProfile;
-
- private final CompletableFuture<AllocatedSlot> allocatedSlotFuture;
-
- PendingRequest(
- SlotRequestID slotRequestId,
- ResourceProfile resourceProfile) {
- this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
- this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
-
- allocatedSlotFuture = new CompletableFuture<>();
- }
-
- public SlotRequestID getSlotRequestId() {
- return slotRequestId;
- }
-
- public CompletableFuture<AllocatedSlot> getAllocatedSlotFuture() {
- return allocatedSlotFuture;
- }
-
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * A slot, together with the timestamp when it was added
- */
- private static class SlotAndTimestamp {
-
- private final AllocatedSlot slot;
-
- private final long timestamp;
-
- SlotAndTimestamp(AllocatedSlot slot, long timestamp) {
- this.slot = slot;
- this.timestamp = timestamp;
- }
-
- public AllocatedSlot slot() {
- return slot;
- }
-
- public long timestamp() {
- return timestamp;
- }
-
- @Override
- public String toString() {
- return slot + " @ " + timestamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
deleted file mode 100644
index 103bc61..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The gateway for calls on the {@link SlotPool}.
- */
-public interface SlotPoolGateway extends RpcGateway {
-
- // ------------------------------------------------------------------------
- // shutdown
- // ------------------------------------------------------------------------
-
- void suspend();
-
- // ------------------------------------------------------------------------
- // resource manager connection
- // ------------------------------------------------------------------------
-
- /**
- * Connects the SlotPool to the given ResourceManager. After this method is called, the
- * SlotPool will be able to request resources from the given ResourceManager.
- *
- * @param resourceManagerGateway The RPC gateway for the resource manager.
- */
- void connectToResourceManager(ResourceManagerGateway resourceManagerGateway);
-
- /**
- * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
- * be able to request further slots from the Resource Manager, and all currently pending requests
- * to the resource manager will be canceled.
- *
- * <p>The slot pool will still be able to serve slots from its internal pool.
- */
- void disconnectResourceManager();
-
- // ------------------------------------------------------------------------
- // registering / un-registering TaskManagers and slots
- // ------------------------------------------------------------------------
-
- CompletableFuture<Acknowledge> registerTaskManager(ResourceID resourceID);
-
- CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
-
- CompletableFuture<Boolean> offerSlot(
- TaskManagerLocation taskManagerLocation,
- TaskManagerGateway taskManagerGateway,
- SlotOffer slotOffer);
-
- CompletableFuture<Collection<SlotOffer>> offerSlots(
- TaskManagerLocation taskManagerLocation,
- TaskManagerGateway taskManagerGateway,
- Collection<SlotOffer> offers);
-
- void failAllocation(AllocationID allocationID, Exception cause);
-
- // ------------------------------------------------------------------------
- // allocating and disposing slots
- // ------------------------------------------------------------------------
-
- CompletableFuture<LogicalSlot> allocateSlot(
- SlotRequestID requestId,
- ScheduledUnit task,
- ResourceProfile resources,
- Iterable<TaskManagerLocation> locationPreferences,
- @RpcTimeout Time timeout);
-
- void returnAllocatedSlot(SlotRequestID slotRequestId);
-
- /**
- * Cancel a slot allocation request.
- *
- * @param slotRequestId identifying the slot allocation request
- * @return Future acknowledge if the slot allocation has been cancelled
- */
- CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestID slotRequestId);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
deleted file mode 100644
index 98427c2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The slot provider is responsible for preparing slots for ready-to-run tasks.
- *
- * <p>It supports two allocating modes:
- * <ul>
- * <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- * {@link CompletableFuture#getNow(Object)} to get the allocated slot.</li>
- * <li>Queued allocating: A request for a task slot is queued and returns a future that will be
- * fulfilled as soon as a slot becomes available.</li>
- * </ul>
- */
-public interface SlotProvider {
-
- /**
- * Allocating slot with specific requirement.
- *
- * @param task The task to allocate the slot for
- * @param allowQueued Whether allow the task be queued if we do not have enough resource
- * @param preferredLocations preferred locations for the slot allocation
- * @return The future of the allocation
- */
- CompletableFuture<LogicalSlot> allocateSlot(
- ScheduledUnit task,
- boolean allowQueued,
- Collection<TaskManagerLocation> preferredLocations);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
deleted file mode 100644
index 8e19944..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotRequestID.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.util.AbstractID;
-
-/**
- * Request ID identifying different slot requests.
- */
-public final class SlotRequestID extends AbstractID {
- private static final long serialVersionUID = -6072105912250154283L;
-
- public SlotRequestID(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public SlotRequestID() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 45b4a96..289762c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -18,28 +18,29 @@
package org.apache.flink.runtime.instance;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* The SlotSharingGroupAssignment manages a set of shared slots, which are shared between
@@ -215,7 +216,7 @@ public class SlotSharingGroupAssignment {
// note that this does implicitly release the slot we have just added
// as well, because we release its last child slot. That is expected
// and desired.
- constraintGroupSlot.releaseInstanceSlot();
+ constraintGroupSlot.releaseSlot(new FlinkException("Could not create a sub slot in this shared slot."));
}
}
else {
@@ -273,7 +274,7 @@ public class SlotSharingGroupAssignment {
*/
public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
synchronized (lock) {
- Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
+ Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(vertexID, locationPreferences, false);
if (p != null) {
SharedSlot ss = p.f0;
@@ -324,7 +325,7 @@ public class SlotSharingGroupAssignment {
}
TaskManagerLocation location = previous.getTaskManagerLocation();
- Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(
+ Tuple2<SharedSlot, Locality> p = getSharedSlotForTask(
constraint.getGroupId(), Collections.singleton(location), true);
if (p == null) {
@@ -355,7 +356,7 @@ public class SlotSharingGroupAssignment {
// grab a new slot and initialize the constraint with that one.
// preferred locations are defined by the vertex
Tuple2<SharedSlot, Locality> p =
- getSlotForTaskInternal(constraint.getGroupId(), locationPreferences, false);
+ getSharedSlotForTask(constraint.getGroupId(), locationPreferences, false);
if (p == null) {
// could not get a shared slot for this co-location-group
return null;
@@ -382,9 +383,10 @@ public class SlotSharingGroupAssignment {
}
- private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(
- AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly)
- {
+ public Tuple2<SharedSlot, Locality> getSharedSlotForTask(
+ AbstractID groupId,
+ Iterable<TaskManagerLocation> preferredLocations,
+ boolean localOnly) {
// check if there is anything at all in this group assignment
if (allSlots.isEmpty()) {
return null;
@@ -507,7 +509,7 @@ public class SlotSharingGroupAssignment {
}
/**
- * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseInstanceSlot()}.
+ * Called from {@link org.apache.flink.runtime.instance.SharedSlot#releaseSlot(Throwable)}.
*
* @param sharedSlot The slot to be released.
*/
@@ -517,10 +519,11 @@ public class SlotSharingGroupAssignment {
// we are releasing this slot
if (sharedSlot.hasChildren()) {
+ final FlinkException cause = new FlinkException("Releasing shared slot parent.");
// by simply releasing all children, we should eventually release this slot.
Set<Slot> children = sharedSlot.getSubSlots();
while (children.size() > 0) {
- children.iterator().next().releaseInstanceSlot();
+ children.iterator().next().releaseSlot(cause);
}
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
new file mode 100644
index 0000000..e5d4467
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupId.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.util.AbstractID;
+
+public class SlotSharingGroupId extends AbstractID {
+ private static final long serialVersionUID = 8837647978345422042L;
+
+ public SlotSharingGroupId(long lowerPart, long upperPart) {
+ super(lowerPart, upperPart);
+ }
+
+ public SlotSharingGroupId() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index ffc1a7c..baa452f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,16 +18,20 @@
package org.apache.flink.runtime.jobmanager.scheduler;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.instance.Instance;
-
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.runtime.instance.SharedSlot;
-import static org.apache.flink.util.Preconditions.checkState;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* A CoLocationConstraint manages the location of a set of tasks
@@ -43,12 +47,14 @@ public class CoLocationConstraint {
private volatile SharedSlot sharedSlot;
- private volatile ResourceID lockedLocation;
+ private volatile TaskManagerLocation lockedLocation;
+ private volatile SlotRequestId slotRequestId;
CoLocationConstraint(CoLocationGroup group) {
Preconditions.checkNotNull(group);
this.group = group;
+ this.slotRequestId = null;
}
// ------------------------------------------------------------------------
@@ -107,7 +113,7 @@ public class CoLocationConstraint {
*/
public TaskManagerLocation getLocation() {
if (lockedLocation != null) {
- return sharedSlot.getTaskManagerLocation();
+ return lockedLocation;
} else {
throw new IllegalStateException("Location not yet locked");
}
@@ -136,12 +142,12 @@ public class CoLocationConstraint {
this.sharedSlot = newSlot;
}
else if (newSlot != this.sharedSlot){
- if (lockedLocation != null && lockedLocation != newSlot.getTaskManagerID()) {
+ if (lockedLocation != null && !Objects.equals(lockedLocation, newSlot.getTaskManagerLocation())) {
throw new IllegalArgumentException(
"Cannot assign different location to a constraint whose location is locked.");
}
if (this.sharedSlot.isAlive()) {
- this.sharedSlot.releaseInstanceSlot();
+ this.sharedSlot.releaseSlot(new FlinkException("Setting new shared slot for co-location constraint."));
}
this.sharedSlot = newSlot;
@@ -159,7 +165,43 @@ public class CoLocationConstraint {
checkState(lockedLocation == null, "Location is already locked");
checkState(sharedSlot != null, "Cannot lock location without a slot.");
- lockedLocation = sharedSlot.getTaskManagerID();
+ lockedLocation = sharedSlot.getTaskManagerLocation();
+ }
+
+ /**
+ * Locks the location of this slot. The location can be locked only once
+ * and only after a shared slot has been assigned.
+ *
+ * <p>Note: This method exists for compatibility reasons with the Flip-6 SlotPool
+ *
+ * @param taskManagerLocation to lock this co-location constraint to
+ */
+ public void lockLocation(TaskManagerLocation taskManagerLocation) {
+ checkNotNull(taskManagerLocation);
+ checkState(lockedLocation == null, "Location is already locked.");
+
+ lockedLocation = taskManagerLocation;
+ }
+
+ /**
+ * Sets the slot request id of the currently assigned slot to the co-location constraint.
+ * All other tasks belonging to this co-location constraint will be deployed to the same slot.
+ *
+ * @param slotRequestId identifying the assigned slot for this co-location constraint
+ */
+ public void setSlotRequestId(@Nullable SlotRequestId slotRequestId) {
+ this.slotRequestId = slotRequestId;
+ }
+
+ /**
+ * Returns the currently assigned slot request id identifying the slot to which tasks
+ * belonging to this co-location constraint will be deployed to.
+ *
+ * @return Slot request id of the assigned slot or null if none
+ */
+ @Nullable
+ public SlotRequestId getSlotRequestId() {
+ return slotRequestId;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index 546f31f..e1c1657 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -44,8 +44,8 @@ public class NoResourceAvailableException extends JobException {
NoResourceAvailableException(ScheduledUnit task, int numInstances, int numSlotsTotal, int availableSlots) {
super(String.format("%s Task to schedule: < %s > with groupID < %s > in sharing group < %s >. Resources available to scheduler: Number of instances=%d, total number of slots=%d, available slots=%d",
BASE_MESSAGE, task.getTaskToExecute(),
- task.getLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getLocationConstraint().getGroupId(),
- task.getSlotSharingGroup(),
+ task.getCoLocationConstraint() == null ? task.getTaskToExecute().getVertex().getJobvertexId() : task.getCoLocationConstraint().getGroupId(),
+ task.getSlotSharingGroupId(),
numInstances,
numSlotsTotal,
availableSlots));
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 7348c9d..903872b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -19,68 +19,108 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
+/**
+ * ScheduledUnit contains the information necessary to allocate a slot for the given
+ * {@link JobVertexID}.
+ */
public class ScheduledUnit {
-
+
+ @Nullable
private final Execution vertexExecution;
-
- private final SlotSharingGroup sharingGroup;
-
- private final CoLocationConstraint locationConstraint;
+
+ private final JobVertexID jobVertexId;
+
+ @Nullable
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ @Nullable
+ private final CoLocationConstraint coLocationConstraint;
// --------------------------------------------------------------------------------------------
public ScheduledUnit(Execution task) {
- Preconditions.checkNotNull(task);
-
- this.vertexExecution = task;
- this.sharingGroup = null;
- this.locationConstraint = null;
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ null,
+ null);
}
- public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit) {
- Preconditions.checkNotNull(task);
-
- this.vertexExecution = task;
- this.sharingGroup = sharingUnit;
- this.locationConstraint = null;
+ public ScheduledUnit(Execution task, @Nullable SlotSharingGroupId slotSharingGroupId) {
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ slotSharingGroupId,
+ null);
}
- public ScheduledUnit(Execution task, SlotSharingGroup sharingUnit, CoLocationConstraint locationConstraint) {
- Preconditions.checkNotNull(task);
- Preconditions.checkNotNull(sharingUnit);
- Preconditions.checkNotNull(locationConstraint);
-
+ public ScheduledUnit(
+ Execution task,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ @Nullable CoLocationConstraint coLocationConstraint) {
+ this(
+ Preconditions.checkNotNull(task),
+ task.getVertex().getJobvertexId(),
+ slotSharingGroupId,
+ coLocationConstraint);
+ }
+
+ public ScheduledUnit(
+ JobVertexID jobVertexId,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ @Nullable CoLocationConstraint coLocationConstraint) {
+ this(
+ null,
+ jobVertexId,
+ slotSharingGroupId,
+ coLocationConstraint);
+ }
+
+ public ScheduledUnit(
+ @Nullable Execution task,
+ JobVertexID jobVertexId,
+ @Nullable SlotSharingGroupId slotSharingGroupId,
+ @Nullable CoLocationConstraint coLocationConstraint) {
+
this.vertexExecution = task;
- this.sharingGroup = sharingUnit;
- this.locationConstraint = locationConstraint;
+ this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+ this.slotSharingGroupId = slotSharingGroupId;
+ this.coLocationConstraint = coLocationConstraint;
+
}
// --------------------------------------------------------------------------------------------
public JobVertexID getJobVertexId() {
- return this.vertexExecution.getVertex().getJobvertexId();
+ return jobVertexId;
}
-
+
+ @Nullable
public Execution getTaskToExecute() {
return vertexExecution;
}
-
- public SlotSharingGroup getSlotSharingGroup() {
- return sharingGroup;
+
+ @Nullable
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
}
-
- public CoLocationConstraint getLocationConstraint() {
- return locationConstraint;
+
+ @Nullable
+ public CoLocationConstraint getCoLocationConstraint() {
+ return coLocationConstraint;
}
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + sharingGroup +
- ", locationConstraint=" + locationConstraint + '}';
+ return "{task=" + vertexExecution.getVertexWithAttempt() + ", sharingUnit=" + slotSharingGroupId +
+ ", locationConstraint=" + coLocationConstraint + '}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index a3c38e0..40fb760 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -18,20 +18,22 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -39,6 +41,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -49,6 +53,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -177,7 +182,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
synchronized (globalLock) {
- SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+ SlotSharingGroup sharingUnit = vertex.getJobVertex().getSlotSharingGroup();
if (sharingUnit != null) {
@@ -189,7 +194,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
- final CoLocationConstraint constraint = task.getLocationConstraint();
+ final CoLocationConstraint constraint = task.getCoLocationConstraint();
// sanity check that we do not use an externally forced location and a co-location constraint together
if (constraint != null && forceExternalLocation) {
@@ -274,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// if there is no slot from the group, or the new slot is local,
// then we use the new slot
if (slotFromGroup != null) {
- slotFromGroup.releaseInstanceSlot();
+ slotFromGroup.releaseSlot(null);
}
toUse = newSlot;
}
@@ -282,7 +287,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
// both are available and usable. neither is local. in that case, we may
// as well use the slot from the sharing group, to minimize the number of
// instances that the job occupies
- newSlot.releaseInstanceSlot();
+ newSlot.releaseSlot(null);
toUse = slotFromGroup;
}
@@ -299,10 +304,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
catch (Throwable t) {
if (slotFromGroup != null) {
- slotFromGroup.releaseInstanceSlot();
+ slotFromGroup.releaseSlot(t);
}
if (newSlot != null) {
- newSlot.releaseInstanceSlot();
+ newSlot.releaseSlot(t);
}
ExceptionUtils.rethrow(t, "An error occurred while allocating a slot in a sharing group");
@@ -444,7 +449,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
else {
// could not add and allocate the sub-slot, so release shared slot
- sharedSlot.releaseInstanceSlot();
+ sharedSlot.releaseSlot(new FlinkException("Could not allocate sub-slot."));
}
}
}
@@ -854,4 +859,19 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return future;
}
}
+
+ // ------------------------------------------------------------------------
+ // Testing methods
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ @Nullable
+ public Instance getInstance(ResourceID resourceId) {
+ for (Instance instance : allInstances) {
+ if (Objects.equals(resourceId, instance.getTaskManagerID())) {
+ return instance;
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index 0fa1362..86be9d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -23,6 +23,7 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
/**
@@ -39,7 +40,8 @@ public class SlotSharingGroup implements java.io.Serializable {
/** Mapping of tasks to subslots. This field is only needed inside the JobManager, and is not RPCed. */
private transient SlotSharingGroupAssignment taskAssignment;
-
+
+ private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
public SlotSharingGroup() {}
@@ -62,8 +64,11 @@ public class SlotSharingGroup implements java.io.Serializable {
public Set<JobVertexID> getJobVertexIds() {
return Collections.unmodifiableSet(ids);
}
-
-
+
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+
public SlotSharingGroupAssignment getTaskAssignment() {
if (this.taskAssignment == null) {
this.taskAssignment = new SlotSharingGroupAssignment();
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
deleted file mode 100644
index a5b75d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SimpleSlotContext.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.SlotRequestID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Simple implementation of the {@link SlotContext} interface for the legacy code.
- */
-public class SimpleSlotContext implements SlotContext {
-
- private final SlotRequestID slotRequestId;
-
- private final AllocationID allocationId;
-
- private final TaskManagerLocation taskManagerLocation;
-
- private final int physicalSlotNumber;
-
- private final TaskManagerGateway taskManagerGateway;
-
- public SimpleSlotContext(
- SlotRequestID slotRequestId,
- AllocationID allocationId,
- TaskManagerLocation taskManagerLocation,
- int physicalSlotNumber,
- TaskManagerGateway taskManagerGateway) {
- this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
- this.allocationId = Preconditions.checkNotNull(allocationId);
- this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
- this.physicalSlotNumber = physicalSlotNumber;
- this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
- }
-
- @Override
- public SlotRequestID getSlotRequestId() {
- return slotRequestId;
- }
-
- @Override
- public AllocationID getAllocationId() {
- return allocationId;
- }
-
- @Override
- public TaskManagerLocation getTaskManagerLocation() {
- return taskManagerLocation;
- }
-
- @Override
- public int getPhysicalSlotNumber() {
- return physicalSlotNumber;
- }
-
- @Override
- public TaskManagerGateway getTaskManagerGateway() {
- return taskManagerGateway;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
index 5ae057d..85871c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.jobmanager.slots;
-import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,11 +39,11 @@ public class SlotAndLocality {
// ------------------------------------------------------------------------
- public AllocatedSlot slot() {
+ public AllocatedSlot getSlot() {
return slot;
}
- public Locality locality() {
+ public Locality getLocality() {
return locality;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
deleted file mode 100644
index 1e0317a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotRequestID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-/**
- * Interface for the context of a logical {@link Slot}. This context contains information
- * about the underlying allocated slot and how to communicate with the TaskManager on which
- * it was allocated.
- */
-public interface SlotContext {
-
- /**
- * Gets the slot request id under which the slot has been requested. This id uniquely identifies the logical slot.
- *
- * @return The id under which the slot has been requested
- */
- SlotRequestID getSlotRequestId();
-
- /**
- * Gets the id under which the slot has been allocated on the TaskManager. This id uniquely identifies the
- * physical slot.
- *
- * @return The id under whic teh slot has been allocated on the TaskManager
- */
- AllocationID getAllocationId();
-
- /**
- * Gets the location info of the TaskManager that offers this slot.
- *
- * @return The location info of the TaskManager that offers this slot
- */
- TaskManagerLocation getTaskManagerLocation();
-
- /**
- * Gets the number of the slot.
- *
- * @return The number of the slot on the TaskManager.
- */
- int getPhysicalSlotNumber();
-
- /**
- * Gets the actor gateway that can be used to send messages to the TaskManager.
- * <p>
- * This method should be removed once the new interface-based RPC abstraction is in place
- *
- * @return The gateway that can be used to send messages to the TaskManager.
- */
- TaskManagerGateway getTaskManagerGateway();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
deleted file mode 100644
index bc1ced4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.runtime.instance.LogicalSlot;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Interface for components that hold slots and to which slots get released / recycled.
- */
-public interface SlotOwner {
-
- /**
- * Return the given slot to the slot owner.
- *
- * @param logicalSlot to return
- * @return Future which is completed with true if the slot could be returned, otherwise with false
- */
- CompletableFuture<Boolean> returnAllocatedSlot(LogicalSlot logicalSlot);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0ef7fdde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 324557f..7a2844d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,9 +55,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.LogicalSlot;
-import org.apache.flink.runtime.instance.SlotPool;
-import org.apache.flink.runtime.instance.SlotPoolGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;