You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:48 UTC
[29/52] [abbrv] flink git commit: [FLINK-4987] Harden SlotPool on
JobMaster
[FLINK-4987] Harden SlotPool on JobMaster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1ba9f11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1ba9f11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1ba9f11
Branch: refs/heads/master
Commit: a1ba9f1126270d53168394211a99e354aa2cf20d
Parents: 8730e20
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 21 16:51:34 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100
----------------------------------------------------------------------
.../clusterframework/types/ResourceProfile.java | 18 +-
.../org/apache/flink/runtime/instance/Slot.java | 16 +
.../flink/runtime/instance/SlotDescriptor.java | 162 ---
.../apache/flink/runtime/instance/SlotPool.java | 1026 +++++++++++-------
.../flink/runtime/instance/SlotPoolGateway.java | 95 ++
.../runtime/jobmanager/scheduler/Locality.java | 26 +-
.../scheduler/NoResourceAvailableException.java | 6 +-
.../runtime/jobmanager/slots/AllocatedSlot.java | 58 +-
.../jobmanager/slots/PooledSlotProvider.java | 73 --
.../jobmanager/slots/SlotAndLocality.java | 55 +
.../flink/runtime/jobmaster/JobMaster.java | 102 +-
.../apache/flink/runtime/rpc/RpcEndpoint.java | 18 +-
.../apache/flink/runtime/util/clock/Clock.java | 40 +
.../flink/runtime/util/clock/SystemClock.java | 57 +
.../types/ResourceProfileTest.java | 5 +
.../runtime/instance/AllocatedSlotsTest.java | 270 ++---
.../runtime/instance/AvailableSlotsTest.java | 247 +++--
.../flink/runtime/instance/SlotPoolTest.java | 596 +++++-----
.../runtime/minicluster/MiniClusterITCase.java | 13 +-
19 files changed, 1650 insertions(+), 1233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 7a25de1..ddc7547 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,14 +18,21 @@
package org.apache.flink.runtime.clusterframework.types;
+import javax.annotation.Nonnull;
import java.io.Serializable;
/**
* Describe the resource profile of the slot, either when requiring or offering it. The profile can be
* checked whether it can match another profile's requirement, and furthermore we may calculate a matching
* score to decide which profile we should choose when we have lots of candidate slots.
+ *
+ * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
+ * <ol>
+ * <li>Memory Size</li>
+ * <li>CPU cores</li>
+ * </ol>
*/
-public class ResourceProfile implements Serializable {
+public class ResourceProfile implements Serializable, Comparable<ResourceProfile> {
private static final long serialVersionUID = 1L;
@@ -90,11 +97,18 @@ public class ResourceProfile implements Serializable {
return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
}
+ @Override
+ public int compareTo(@Nonnull ResourceProfile other) {
+ int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB);
+ int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
+ return (cmp1 != 0) ? cmp1 : cmp2;
+ }
+
// ------------------------------------------------------------------------
@Override
public int hashCode() {
- long cpuBits = Double.doubleToLongBits(cpuCores);
+ long cpuBits = Double.doubleToRawLongBits(cpuCores);
return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 8f8b897..d6d8f12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -350,6 +350,22 @@ public abstract class Slot {
// Utilities
// --------------------------------------------------------------------------------------------
+ /**
+ * Slots must always has based on reference identity.
+ */
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
+ * Slots must always compare on referential equality.
+ */
+ @Override
+ public final boolean equals(Object obj) {
+ return this == obj;
+ }
+
@Override
public String toString() {
return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status);
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
deleted file mode 100644
index 47ce422..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The description of slots, TaskManagers offer one or more task slots, which define a slice of
- * their resources. This description will contain some static information about the slot, such
- * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which
- * owns the slot.
- */
-public class SlotDescriptor {
-
- /** The ID of the job this slice belongs to. */
- private final JobID jobID;
-
- /** The location information of the TaskManager to which this slot belongs */
- private final TaskManagerLocation taskManagerLocation;
-
- /** The number of the slot on which the task is deployed */
- private final int slotNumber;
-
- /** The resource profile of the slot provides */
- private final ResourceProfile resourceProfile;
-
- /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
- private final TaskManagerGateway taskManagerGateway;
-
- public SlotDescriptor(
- final JobID jobID,
- final TaskManagerLocation location,
- final int slotNumber,
- final ResourceProfile resourceProfile,
- final TaskManagerGateway taskManagerGateway)
- {
- this.jobID = checkNotNull(jobID);
- this.taskManagerLocation = checkNotNull(location);
- this.slotNumber = slotNumber;
- this.resourceProfile = checkNotNull(resourceProfile);
- this.taskManagerGateway = checkNotNull(taskManagerGateway);
- }
-
- public SlotDescriptor(final SlotDescriptor other) {
- this.jobID = other.jobID;
- this.taskManagerLocation = other.taskManagerLocation;
- this.slotNumber = other.slotNumber;
- this.resourceProfile = other.resourceProfile;
- this.taskManagerGateway = other.taskManagerGateway;
- }
-
- // TODO - temporary workaround until we have the SlotDesriptor in the Slot
- public SlotDescriptor(final Slot slot) {
- this.jobID = slot.getJobID();
- this.taskManagerLocation = slot.getTaskManagerLocation();
- this.slotNumber = slot.getRootSlotNumber();
- this.resourceProfile = new ResourceProfile(0, 0);
- this.taskManagerGateway = slot.getTaskManagerGateway();
- }
-
- /**
- * Returns the ID of the job this allocated slot belongs to.
- *
- * @return the ID of the job this allocated slot belongs to
- */
- public JobID getJobID() {
- return jobID;
- }
-
- /**
- * Gets the number of the slot.
- *
- * @return The number of the slot on the TaskManager.
- */
- public int getSlotNumber() {
- return slotNumber;
- }
-
- /**
- * Gets the resource profile of the slot.
- *
- * @return The resource profile of the slot.
- */
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
-
- /**
- * Gets the location info of the TaskManager that offers this slot.
- *
- * @return The location info of the TaskManager that offers this slot
- */
- public TaskManagerLocation getTaskManagerLocation() {
- return taskManagerLocation;
- }
-
- /**
- * Gets the actor gateway that can be used to send messages to the TaskManager.
- * <p>
- * This method should be removed once the new interface-based RPC abstraction is in place
- *
- * @return The actor gateway that can be used to send messages to the TaskManager.
- */
- public TaskManagerGateway getTaskManagerGateway() {
- return taskManagerGateway;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SlotDescriptor that = (SlotDescriptor) o;
-
- if (slotNumber != that.slotNumber) {
- return false;
- }
- if (!jobID.equals(that.jobID)) {
- return false;
- }
- return taskManagerLocation.equals(that.taskManagerLocation);
-
- }
-
- @Override
- public int hashCode() {
- int result = jobID.hashCode();
- result = 31 * result + taskManagerLocation.hashCode();
- result = 31 * result + slotNumber;
- return result;
- }
-
- @Override
- public String toString() {
- return taskManagerLocation + " - " + slotNumber;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 44df29b..5a3a321 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,25 +25,41 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,18 +74,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <p>
* All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
* eliminate ambiguities.
+ *
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a slot request
*/
-public class SlotPool implements SlotOwner {
+public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
+
+ /** The log for the pool - shared also with the internal classes */
+ static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+ // ------------------------------------------------------------------------
- private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+ private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
+
+ private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
+
+ private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+
+ // ------------------------------------------------------------------------
private final Object lock = new Object();
- /** The executor which is used to execute futures */
- private final Executor executor;
+ private final JobID jobId;
- /** All registered resources, slots will be accepted and used only if the resource is registered */
- private final Set<ResourceID> registeredResources;
+ private final ProviderAndOwner providerAndOwner;
+
+ /** All registered TaskManagers, slots will be accepted and used only if the resource is registered */
+ private final HashSet<ResourceID> registeredTaskManagers;
/** The book-keeping of all allocated slots */
private final AllocatedSlots allocatedSlots;
@@ -78,10 +109,15 @@ public class SlotPool implements SlotOwner {
private final AvailableSlots availableSlots;
/** All pending requests waiting for slots */
- private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+ private final HashMap<AllocationID, PendingRequest> pendingRequests;
+
+ /** Timeout for request calls to the ResourceManager */
+ private final Time resourceManagerRequestsTimeout;
- /** Timeout of slot allocation */
- private final Time timeout;
+ /** Timeout for allocation round trips (RM -> launch TM -> offer slot) */
+ private final Time resourceManagerAllocationTimeout;
+
+ private final Clock clock;
/** the leader id of job manager */
private UUID jobManagerLeaderId;
@@ -92,177 +128,238 @@ public class SlotPool implements SlotOwner {
/** The gateway to communicate with resource manager */
private ResourceManagerGateway resourceManagerGateway;
- public SlotPool(final Executor executor) {
- this.executor = executor;
- this.registeredResources = new HashSet<>();
+ // ------------------------------------------------------------------------
+
+ public SlotPool(RpcService rpcService, JobID jobId) {
+ this(rpcService, jobId, SystemClock.getInstance(),
+ DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+ }
+
+ public SlotPool(
+ RpcService rpcService,
+ JobID jobId,
+ Clock clock,
+ Time slotRequestTimeout,
+ Time resourceManagerAllocationTimeout,
+ Time resourceManagerRequestTimeout) {
+
+ super(rpcService);
+
+ this.jobId = checkNotNull(jobId);
+ this.clock = checkNotNull(clock);
+ this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+ this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
+
+ this.registeredTaskManagers = new HashSet<>();
this.allocatedSlots = new AllocatedSlots();
this.availableSlots = new AvailableSlots();
this.pendingRequests = new HashMap<>();
- this.timeout = Time.of(5, TimeUnit.SECONDS);
- }
- public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
- this.jobManagerLeaderId = jobManagerLeaderId;
+ this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
}
// ------------------------------------------------------------------------
- // Slot Allocation
+ // Starting and Stopping
// ------------------------------------------------------------------------
+ @Override
+ public void start() {
+ throw new UnsupportedOperationException("Should never call start() without leader ID");
+ }
+
/**
- * Try to allocate a simple slot with specified resource profile.
+ * Start the slot pool to accept RPC calls.
*
- * @param jobID The job id which the slot allocated for
- * @param resourceProfile The needed resource profile
- * @return The future of allocated simple slot
+ * @param jobManagerLeaderId The necessary leader id for running the job.
*/
- public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) {
- return allocateSimpleSlot(jobID, resourceProfile, new AllocationID());
- }
+ public void start(UUID jobManagerLeaderId) {
+ this.jobManagerLeaderId = jobManagerLeaderId;
+ // TODO - start should not throw an exception
+ try {
+ super.start();
+ } catch (Exception e) {
+ throw new RuntimeException("This should never happen", e);
+ }
+ }
/**
- * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly
- * for testing purpose since we need to specify whatever allocation id we want.
+ * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
*/
- @VisibleForTesting
- Future<SimpleSlot> allocateSimpleSlot(
- final JobID jobID,
- final ResourceProfile resourceProfile,
- final AllocationID allocationID)
- {
- final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
-
- internalAllocateSlot(jobID, allocationID, resourceProfile, future);
-
- return future.thenApplyAsync(
- new ApplyFunction<SlotDescriptor, SimpleSlot>() {
- @Override
- public SimpleSlot apply(SlotDescriptor descriptor) {
- SimpleSlot slot = new SimpleSlot(
- descriptor.getJobID(), SlotPool.this,
- descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
- descriptor.getTaskManagerGateway());
- synchronized (lock) {
- // double validation since we are out of the lock protection after the slot is granted
- if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
- LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID);
- allocatedSlots.add(allocationID, descriptor, slot);
- }
- else {
- throw new RuntimeException("Resource was marked dead asynchronously.");
- }
- }
- return slot;
- }
- },
- executor
- );
+ @RpcMethod
+ public void suspend() {
+ validateRunsInMainThread();
+
+ // suspend this RPC endpoint
+ ((StartStoppable) getSelf()).stop();
+
+ // do not accept any requests
+ jobManagerLeaderId = null;
+ resourceManagerLeaderId = null;
+ resourceManagerGateway = null;
+
+ // Clear (but not release!) the available slots. The TaskManagers should re-register them
+ // at the new leader JobManager/SlotPool
+ availableSlots.clear();
+ allocatedSlots.clear();
+ pendingRequests.clear();
}
+ // ------------------------------------------------------------------------
+ // Getting PoolOwner and PoolProvider
+ // ------------------------------------------------------------------------
/**
- * Try to allocate a shared slot with specified resource profile.
- *
- * @param jobID The job id which the slot allocated for
- * @param resourceProfile The needed resource profile
- * @param sharingGroupAssignment The slot sharing group of the vertex
- * @return The future of allocated shared slot
+ * Gets the slot owner implementation for this pool.
+ *
+ * <p>This method does not mutate state and can be called directly (no RPC indirection)
+ *
+ * @return The slot owner implementation for this pool.
*/
- public Future<SharedSlot> allocateSharedSlot(
- final JobID jobID,
- final ResourceProfile resourceProfile,
- final SlotSharingGroupAssignment sharingGroupAssignment)
- {
- return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID());
+ public SlotOwner getSlotOwner() {
+ return providerAndOwner;
}
/**
- * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly
- * for testing purpose since we need to specify whatever allocation id we want.
+ * Gets the slot provider implementation for this pool.
+ *
+ * <p>This method does not mutate state and can be called directly (no RPC indirection)
+ *
+ * @return The slot provider implementation for this pool.
*/
- @VisibleForTesting
- Future<SharedSlot> allocateSharedSlot(
- final JobID jobID,
- final ResourceProfile resourceProfile,
- final SlotSharingGroupAssignment sharingGroupAssignment,
- final AllocationID allocationID)
- {
- final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
-
- internalAllocateSlot(jobID, allocationID, resourceProfile, future);
-
- return future.thenApplyAsync(
- new ApplyFunction<SlotDescriptor, SharedSlot>() {
- @Override
- public SharedSlot apply(SlotDescriptor descriptor) {
- SharedSlot slot = new SharedSlot(
- descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(),
- descriptor.getSlotNumber(), descriptor.getTaskManagerGateway(),
- sharingGroupAssignment);
-
- synchronized (lock) {
- // double validation since we are out of the lock protection after the slot is granted
- if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
- LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID);
- allocatedSlots.add(allocationID, descriptor, slot);
- }
- else {
- throw new RuntimeException("Resource was marked dead asynchronously.");
- }
- }
- return slot;
- }
- },
- executor
- );
+ public SlotProvider getSlotProvider() {
+ return providerAndOwner;
}
- /**
- * Internally allocate the slot with specified resource profile. We will first check whether we have some
- * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to
- * allocation the slot from resource manager.
- */
- private void internalAllocateSlot(
- final JobID jobID,
- final AllocationID allocationID,
- final ResourceProfile resourceProfile,
- final FlinkCompletableFuture<SlotDescriptor> future)
- {
- LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID);
-
- synchronized (lock) {
- // check whether we have any free slot which can match the required resource profile
- SlotDescriptor freeSlot = availableSlots.poll(resourceProfile);
- if (freeSlot != null) {
- future.complete(freeSlot);
- }
- else {
- if (resourceManagerGateway != null) {
- LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.",
- allocationID);
- SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
- pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future));
- resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout)
- .handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() {
- @Override
- public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
- if (throwable != null) {
- future.completeExceptionally(
- new Exception("Slot allocation from resource manager failed", throwable));
- } else if (slotRequestReply instanceof RMSlotRequestRejected) {
- future.completeExceptionally(
- new Exception("Slot allocation rejected by resource manager"));
- }
- return null;
- }
- }, executor);
+ // ------------------------------------------------------------------------
+ // Resource Manager Connection
+ // ------------------------------------------------------------------------
+
+ @RpcMethod
+ public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
+ this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+ this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+ }
+
+ @RpcMethod
+ public void disconnectResourceManager() {
+ this.resourceManagerLeaderId = null;
+ this.resourceManagerGateway = null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Slot Allocation
+ // ------------------------------------------------------------------------
+
+ @RpcMethod
+ public Future<SimpleSlot> allocateSlot(
+ ScheduledUnit task,
+ ResourceProfile resources,
+ Iterable<TaskManagerLocation> locationPreferences) {
+
+ return internalAllocateSlot(task, resources, locationPreferences);
+ }
+
+ @RpcMethod
+ public void returnAllocatedSlot(Slot slot) {
+ internalReturnAllocatedSlot(slot);
+ }
+
+
+ Future<SimpleSlot> internalAllocateSlot(
+ ScheduledUnit task,
+ ResourceProfile resources,
+ Iterable<TaskManagerLocation> locationPreferences) {
+
+ // (1) do we have a slot available already?
+ SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
+ if (slotFromPool != null) {
+ SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
+ allocatedSlots.add(slot);
+ return FlinkCompletableFuture.completed(slot);
+ }
+
+ // (2) no slot available, and no resource manager connection
+ if (resourceManagerGateway == null) {
+ return FlinkCompletableFuture.completedExceptionally(
+ new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
+
+ }
+
+ // (3) we have a resource manager connection, so let's ask it for more resources
+ final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+ final AllocationID allocationID = new AllocationID();
+
+ LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
+
+ pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+ Future<RMSlotRequestReply> rmResponse = resourceManagerGateway.requestSlot(
+ resourceManagerLeaderId, jobManagerLeaderId,
+ new SlotRequest(jobId, allocationID, resources),
+ resourceManagerRequestsTimeout);
+
+ // on success, trigger let the slot pool know
+ rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() {
+ @Override
+ public void accept(RMSlotRequestReply reply) {
+ if (reply.getAllocationID() != null && reply.getAllocationID().equals(allocationID)) {
+ if (reply instanceof RMSlotRequestRegistered) {
+ slotRequestToResourceManagerSuccess(allocationID);
+ }
+ else if (reply instanceof RMSlotRequestRejected) {
+ slotRequestToResourceManagerFailed(allocationID,
+ new Exception("ResourceManager rejected slot request"));
+ }
+ else {
+ slotRequestToResourceManagerFailed(allocationID,
+ new Exception("Unknown ResourceManager response: " + reply));
+ }
}
else {
- LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID);
- future.completeExceptionally(new Exception("Resource manager not available right now."));
+ future.completeExceptionally(new Exception(String.format(
+ "Bug: ResourceManager response had wrong AllocationID. Request: %s , Response: %s",
+ allocationID, reply.getAllocationID())));
}
}
+ }, getMainThreadExecutor());
+
+ // on failure, fail the request future
+ rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+
+ @Override
+ public Void apply(Throwable failure) {
+ slotRequestToResourceManagerFailed(allocationID, failure);
+ return null;
+ }
+ }, getMainThreadExecutor());
+
+ return future;
+ }
+
+ private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
+ // a request is pending from the ResourceManager to a (future) TaskManager
+ // we only add the watcher here in case that request times out
+ scheduleRunAsync(new Runnable() {
+ @Override
+ public void run() {
+ checkTimeoutSlotAllocation(allocationID);
+ }
+ }, resourceManagerAllocationTimeout);
+ }
+
+ private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) {
+ PendingRequest request = pendingRequests.remove(allocationID);
+ if (request != null) {
+ request.future().completeExceptionally(new NoResourceAvailableException(
+ "No pooled slot available and request to ResourceManager for new slot failed", failure));
+ }
+ }
+
+ private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+ PendingRequest request = pendingRequests.remove(allocationID);
+ if (request != null && !request.future().isDone()) {
+ request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
}
}
@@ -275,123 +372,123 @@ public class SlotPool implements SlotOwner {
* slot can be reused by other pending requests if the resource profile matches.n
*
* @param slot The slot needs to be returned
- * @return True if the returning slot been accepted
*/
- @Override
- public boolean returnAllocatedSlot(Slot slot) {
+ private void internalReturnAllocatedSlot(Slot slot) {
checkNotNull(slot);
checkArgument(!slot.isAlive(), "slot is still alive");
- checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool.");
+ checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool.");
+ // markReleased() is an atomic check-and-set operation, so that the slot is guaranteed
+ // to be returned only once
if (slot.markReleased()) {
- synchronized (lock) {
- final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot);
- if (slotDescriptor != null) {
- // check if this TaskManager is valid
- if (!registeredResources.contains(slot.getTaskManagerID())) {
- return false;
- }
-
- final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor);
- if (pendingRequest != null) {
- pendingRequest.complete(slotDescriptor);
- }
- else {
- availableSlots.add(slotDescriptor);
- }
-
- return true;
+ if (allocatedSlots.remove(slot)) {
+ // this slot allocation is still valid, use the slot to fulfill another request
+ // or make it available again
+ final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot();
+ final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot);
+
+ if (pendingRequest != null) {
+ LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+ pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());
+
+ pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN));
}
else {
- throw new IllegalArgumentException("Slot was not allocated from this pool.");
+ LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
+ availableSlots.add(taskManagerSlot, clock.relativeTimeMillis());
}
}
- }
- else {
- return false;
+ else {
+ LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
+ }
}
}
- private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) {
- for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
- final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
- if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) {
- pendingRequests.remove(entry.getKey());
- return pendingRequest.f1;
+ private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
+ final ResourceProfile slotResources = slot.getResourceProfile();
+
+ for (PendingRequest request : pendingRequests.values()) {
+ if (slotResources.isMatching(request.resourceProfile())) {
+ pendingRequests.remove(request.allocationID());
+ return request;
}
}
+
+ // no request pending, or no request matches
return null;
}
- /**
- * Release slot to TaskManager, called for finished tasks or canceled jobs.
- *
- * @param slot The slot needs to be released.
- */
- public void releaseSlot(final Slot slot) {
- synchronized (lock) {
- allocatedSlots.remove(slot);
- availableSlots.remove(new SlotDescriptor(slot));
- // TODO: send release request to task manager
+ @RpcMethod
+ public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers) {
+ validateRunsInMainThread();
+
+ final ArrayList<SlotOffer> result = new ArrayList<>();
+ for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) {
+ if (offerSlot(offer.f0)) {
+ result.add(offer.f1);
+ }
}
- }
+ return result.isEmpty() ? Collections.<SlotOffer>emptyList() : result;
+ }
+
/**
* Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
* transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
* we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
* request waiting for this slot (maybe fulfilled by some other returned slot).
*
- * @param allocationID The allocation id of the lo
- * @param slotDescriptor The offered slot descriptor
+ * @param slot The offered slot
* @return True if we accept the offering
*/
- public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) {
- synchronized (lock) {
- // check if this TaskManager is valid
- final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
- if (!registeredResources.contains(resourceID)) {
- LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}",
- allocationID, slotDescriptor);
- return false;
- }
+ @RpcMethod
+ public boolean offerSlot(final AllocatedSlot slot) {
+ validateRunsInMainThread();
- // check whether we have already using this slot
- final Slot allocatedSlot = allocatedSlots.get(allocationID);
- if (allocatedSlot != null) {
- final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot);
+ // check if this TaskManager is valid
+ final ResourceID resourceID = slot.getTaskManagerId();
+ final AllocationID allocationID = slot.getSlotAllocationId();
- if (allocatedSlotDescriptor.equals(slotDescriptor)) {
- LOG.debug("Allocation[{}] Duplicated slot offering: {}",
- allocationID, slotDescriptor);
- return true;
- }
- else {
- LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}",
- allocationID, allocatedSlotDescriptor, slotDescriptor);
- return false;
- }
- }
+ if (!registeredTaskManagers.contains(resourceID)) {
+ LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+ slot.getSlotAllocationId(), slot);
+ return false;
+ }
- // check whether we already have this slot in free pool
- if (availableSlots.contains(slotDescriptor)) {
- LOG.debug("Allocation[{}] Duplicated slot offering: {}",
- allocationID, slotDescriptor);
- return true;
- }
+ // check whether we have already using this slot
+ if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
+ LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
- // check whether we have request waiting for this slot
- if (pendingRequests.containsKey(allocationID)) {
- FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1;
- future.complete(slotDescriptor);
- return true;
- }
+ // return true here so that the sender will get a positive acknowledgement to the retry
+ // and mark the offering as a success
+ return true;
+ }
- // unwanted slot, rejecting this offer
- return false;
+ // check whether we have request waiting for this slot
+ PendingRequest pendingRequest = pendingRequests.remove(allocationID);
+ if (pendingRequest != null) {
+ // we were waiting for this!
+ SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
+ pendingRequest.future().complete(resultSlot);
+ allocatedSlots.add(resultSlot);
+ }
+ else {
+ // we were actually not waiting for this:
+ // - could be that this request had been fulfilled
+ // - we are receiving the slots from TaskManagers after becoming leaders
+ availableSlots.add(slot, clock.relativeTimeMillis());
}
+
+ // we accepted the request in any case. slot will be released after it idled for
+ // too long and timed out
+ return true;
}
+
+ // TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active)
+
+ // TODO - release slots that were not used to the resource manager
+
// ------------------------------------------------------------------------
// Error Handling
// ------------------------------------------------------------------------
@@ -405,24 +502,29 @@ public class SlotPool implements SlotOwner {
* @param allocationID Represents the allocation which should be failed
* @param cause The cause of the failure
*/
+ @RpcMethod
public void failAllocation(final AllocationID allocationID, final Exception cause) {
- synchronized (lock) {
- // 1. check whether the allocation still pending
- Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
- pendingRequests.get(allocationID);
- if (pendingRequest != null) {
- pendingRequest.f1.completeExceptionally(cause);
- return;
+ final PendingRequest pendingRequest = pendingRequests.remove(allocationID);
+ if (pendingRequest != null) {
+ // request was still pending
+ LOG.debug("Failed pending request [{}] with ", allocationID, cause);
+ pendingRequest.future().completeExceptionally(cause);
+ }
+ else if (availableSlots.tryRemove(allocationID)) {
+ LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+ }
+ else {
+ Slot slot = allocatedSlots.remove(allocationID);
+ if (slot != null) {
+ // release the slot.
+ // since it is not in 'allocatedSlots' any more, it will be dropped o return'
+ slot.releaseSlot();
+ }
+ else {
+ LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
}
-
- // 2. check whether we have a free slot corresponding to this allocation id
- // TODO: add allocation id to slot descriptor, so we can remove it by allocation id
-
- // 3. check whether we have a in-use slot corresponding to this allocation id
- // TODO: needs mechanism to release the in-use Slot but don't return it back to this pool
-
- // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
}
+ // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
}
// ------------------------------------------------------------------------
@@ -435,10 +537,9 @@ public class SlotPool implements SlotOwner {
*
* @param resourceID The id of the TaskManager
*/
- public void registerResource(final ResourceID resourceID) {
- synchronized (lock) {
- registeredResources.add(resourceID);
- }
+ @RpcMethod
+ public void registerTaskManager(final ResourceID resourceID) {
+ registeredTaskManagers.add(resourceID);
}
/**
@@ -447,12 +548,12 @@ public class SlotPool implements SlotOwner {
*
* @param resourceID The id of the TaskManager
*/
- public void releaseResource(final ResourceID resourceID) {
- synchronized (lock) {
- registeredResources.remove(resourceID);
- availableSlots.removeByResource(resourceID);
+ @RpcMethod
+ public void releaseTaskManager(final ResourceID resourceID) {
+ if (registeredTaskManagers.remove(resourceID)) {
+ availableSlots.removeAllForTaskManager(resourceID);
- final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID);
+ final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
for (Slot slot : allocatedSlotsForResource) {
slot.releaseSlot();
}
@@ -460,24 +561,15 @@ public class SlotPool implements SlotOwner {
}
// ------------------------------------------------------------------------
- // ResourceManager
+ // Utilities
// ------------------------------------------------------------------------
- public void setResourceManager(
- final UUID resourceManagerLeaderId,
- final ResourceManagerGateway resourceManagerGateway)
- {
- synchronized (lock) {
- this.resourceManagerLeaderId = resourceManagerLeaderId;
- this.resourceManagerGateway = resourceManagerGateway;
- }
- }
-
- public void disconnectResourceManager() {
- synchronized (lock) {
- this.resourceManagerLeaderId = null;
- this.resourceManagerGateway = null;
+ private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
+ SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber());
+ if (locality != null) {
+ result.setLocality(locality);
}
+ return result;
}
// ------------------------------------------------------------------------
@@ -487,45 +579,34 @@ public class SlotPool implements SlotOwner {
/**
* Organize allocated slots from different points of view.
*/
- static class AllocatedSlots {
+ private static class AllocatedSlots {
/** All allocated slots organized by TaskManager's id */
- private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
-
- /** All allocated slots organized by Slot object */
- private final Map<Slot, AllocationID> allocatedSlots;
-
- /** All allocated slot descriptors organized by Slot object */
- private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
+ private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
/** All allocated slots organized by AllocationID */
private final Map<AllocationID, Slot> allocatedSlotsById;
AllocatedSlots() {
- this.allocatedSlotsByResource = new HashMap<>();
- this.allocatedSlots = new HashMap<>();
- this.allocatedSlotsWithDescriptor = new HashMap<>();
+ this.allocatedSlotsByTaskManager = new HashMap<>();
this.allocatedSlotsById = new HashMap<>();
}
/**
- * Add a new allocation
+ * Adds a new slot to this collection.
*
- * @param allocationID The allocation id
- * @param slot The allocated slot
+ * @param slot The allocated slot
*/
- void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) {
- allocatedSlots.put(slot, allocationID);
- allocatedSlotsById.put(allocationID, slot);
- allocatedSlotsWithDescriptor.put(slot, descriptor);
+ void add(Slot slot) {
+ allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot);
final ResourceID resourceID = slot.getTaskManagerID();
- Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
- if (slotsForResource == null) {
- slotsForResource = new HashSet<>();
- allocatedSlotsByResource.put(resourceID, slotsForResource);
+ Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID);
+ if (slotsForTaskManager == null) {
+ slotsForTaskManager = new HashSet<>();
+ allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager);
}
- slotsForResource.add(slot);
+ slotsForTaskManager.add(slot);
}
/**
@@ -541,11 +622,11 @@ public class SlotPool implements SlotOwner {
/**
* Check whether we have allocated this slot
*
- * @param slot The slot needs to checked
+ * @param slotAllocationId The allocation id of the slot to check
* @return True if we contains this slot
*/
- boolean contains(final Slot slot) {
- return allocatedSlots.containsKey(slot);
+ boolean contains(AllocationID slotAllocationId) {
+ return allocatedSlotsById.containsKey(slotAllocationId);
}
/**
@@ -553,25 +634,27 @@ public class SlotPool implements SlotOwner {
*
* @param slot The slot needs to be removed
*/
- SlotDescriptor remove(final Slot slot) {
- final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot);
- if (descriptor != null) {
- final AllocationID allocationID = allocatedSlots.remove(slot);
- if (allocationID != null) {
- allocatedSlotsById.remove(allocationID);
- } else {
- throw new IllegalStateException("Bug: maps are inconsistent");
- }
+ boolean remove(final Slot slot) {
+ return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
+ }
- final ResourceID resourceID = slot.getTaskManagerID();
- final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
- slotsForResource.remove(slot);
- if (slotsForResource.isEmpty()) {
- allocatedSlotsByResource.remove(resourceID);
+ /**
+ * Remove an allocation with slot.
+ *
+ * @param slotId The ID of the slot to be removed
+ */
+ Slot remove(final AllocationID slotId) {
+ Slot slot = allocatedSlotsById.remove(slotId);
+ if (slot != null) {
+ final ResourceID taskManagerId = slot.getTaskManagerID();
+ Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+ slotsForTM.remove(slot);
+ if (slotsForTM.isEmpty()) {
+ allocatedSlotsByTaskManager.get(taskManagerId);
}
-
- return descriptor;
- } else {
+ return slot;
+ }
+ else {
return null;
}
}
@@ -582,119 +665,326 @@ public class SlotPool implements SlotOwner {
* @param resourceID The id of the TaskManager
* @return Set of slots which are allocated from the same TaskManager
*/
- Set<Slot> getSlotsByResource(final ResourceID resourceID) {
- Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
- if (slotsForResource != null) {
- return new HashSet<>(slotsForResource);
+ Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) {
+ Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
+ if (slotsForTaskManager != null) {
+ for (Slot slot : slotsForTaskManager) {
+ allocatedSlotsById.remove(slot.getAllocatedSlot().getSlotAllocationId());
+ }
+ return slotsForTaskManager;
}
else {
- return new HashSet<>();
+ return Collections.emptySet();
}
}
+ void clear() {
+ allocatedSlotsById.clear();
+ allocatedSlotsByTaskManager.clear();
+ }
+
@VisibleForTesting
boolean containResource(final ResourceID resourceID) {
- return allocatedSlotsByResource.containsKey(resourceID);
+ return allocatedSlotsByTaskManager.containsKey(resourceID);
}
@VisibleForTesting
int size() {
- return allocatedSlots.size();
+ return allocatedSlotsById.size();
}
}
+ // ------------------------------------------------------------------------
+
/**
* Organize all available slots from different points of view.
*/
- static class AvailableSlots {
+ private static class AvailableSlots {
/** All available slots organized by TaskManager */
- private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource;
+ private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;
+
+ /** All available slots organized by host */
+ private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost;
- /** All available slots */
- private final Set<SlotDescriptor> availableSlots;
+ /** The available slots, with the time when they were inserted */
+ private final HashMap<AllocationID, SlotAndTimestamp> availableSlots;
AvailableSlots() {
- this.availableSlotsByResource = new HashMap<>();
- this.availableSlots = new HashSet<>();
+ this.availableSlotsByTaskManager = new HashMap<>();
+ this.availableSlotsByHost = new HashMap<>();
+ this.availableSlots = new HashMap<>();
}
/**
- * Add an available slot.
+ * Adds an available slot.
*
- * @param descriptor The descriptor of the slot
+ * @param slot The slot to add
*/
- void add(final SlotDescriptor descriptor) {
- availableSlots.add(descriptor);
-
- final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID();
- Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID);
- if (slotsForResource == null) {
- slotsForResource = new HashSet<>();
- availableSlotsByResource.put(resourceID, slotsForResource);
+ void add(final AllocatedSlot slot, final long timestamp) {
+ checkNotNull(slot);
+
+ SlotAndTimestamp previous = availableSlots.put(
+ slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp));
+
+ if (previous == null) {
+ final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+ final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+ Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
+ if (slotsForTaskManager == null) {
+ slotsForTaskManager = new HashSet<>();
+ availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
+ }
+ slotsForTaskManager.add(slot);
+
+ Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+ if (slotsForHost == null) {
+ slotsForHost = new HashSet<>();
+ availableSlotsByHost.put(host, slotsForHost);
+ }
+ slotsForHost.add(slot);
+ }
+ else {
+ throw new IllegalStateException("slot already contained");
}
- slotsForResource.add(descriptor);
}
/**
- * Check whether we have this slot
- *
- * @param slotDescriptor The descriptor of the slot
- * @return True if we contains this slot
+ * Check whether we have this slot.
*/
- boolean contains(final SlotDescriptor slotDescriptor) {
- return availableSlots.contains(slotDescriptor);
+ boolean contains(AllocationID slotId) {
+ return availableSlots.containsKey(slotId);
}
/**
- * Poll a slot which matches the required resource profile
+ * Poll a slot which matches the required resource profile. The polling tries to satisfy the
+ * location preferences, by TaskManager and by host.
*
- * @param resourceProfile The required resource profile
+ * @param resourceProfile The required resource profile.
+ * @param locationPreferences The location preferences, in order to be checked.
+ *
* @return Slot which matches the resource profile, null if we can't find a match
*/
- SlotDescriptor poll(final ResourceProfile resourceProfile) {
- for (SlotDescriptor slotDescriptor : availableSlots) {
- if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
- remove(slotDescriptor);
- return slotDescriptor;
+ SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> locationPreferences) {
+ // fast path if no slots are available
+ if (availableSlots.isEmpty()) {
+ return null;
+ }
+
+ boolean hadLocationPreference = false;
+
+ if (locationPreferences != null) {
+
+ // first search by TaskManager
+ for (TaskManagerLocation location : locationPreferences) {
+ hadLocationPreference = true;
+
+ final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
+ if (onTaskManager != null) {
+ for (AllocatedSlot candidate : onTaskManager) {
+ if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+ remove(candidate.getSlotAllocationId());
+ return new SlotAndLocality(candidate, Locality.LOCAL);
+ }
+ }
+ }
+ }
+
+ // now, search by host
+ for (TaskManagerLocation location : locationPreferences) {
+ final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
+ if (onHost != null) {
+ for (AllocatedSlot candidate : onHost) {
+ if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+ remove(candidate.getSlotAllocationId());
+ return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
+ }
+ }
+ }
+ }
+ }
+
+ // take any slot
+ for (SlotAndTimestamp candidate : availableSlots.values()) {
+ final AllocatedSlot slot = candidate.slot();
+
+ if (slot.getResourceProfile().isMatching(resourceProfile)) {
+ remove(slot.getSlotAllocationId());
+ return new SlotAndLocality(
+ slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
}
}
+
+ // nothing available that matches
return null;
}
/**
* Remove all available slots come from specified TaskManager.
*
- * @param resourceID The id of the TaskManager
+ * @param taskManager The id of the TaskManager
*/
- void removeByResource(final ResourceID resourceID) {
- final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID);
- if (slotsForResource != null) {
- for (SlotDescriptor slotDescriptor : slotsForResource) {
- availableSlots.remove(slotDescriptor);
+ void removeAllForTaskManager(final ResourceID taskManager) {
+ // remove from the by-TaskManager view
+ final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
+
+ if (slotsForTm != null && slotsForTm.size() > 0) {
+ final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
+ final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+ // remove from the base set and the by-host view
+ for (AllocatedSlot slot : slotsForTm) {
+ availableSlots.remove(slot.getSlotAllocationId());
+ slotsForHost.remove(slot);
+ }
+
+ if (slotsForHost.isEmpty()) {
+ availableSlotsByHost.remove(host);
}
}
}
- private void remove(final SlotDescriptor slotDescriptor) {
- availableSlots.remove(slotDescriptor);
+ boolean tryRemove(AllocationID slotId) {
+ final SlotAndTimestamp sat = availableSlots.remove(slotId);
+ if (sat != null) {
+ final AllocatedSlot slot = sat.slot();
+ final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+ final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+ final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID);
+ final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+ slotsForTm.remove(slot);
+ slotsForHost.remove(slot);
+
+ if (slotsForTm.isEmpty()) {
+ availableSlotsByTaskManager.remove(resourceID);
+ }
+ if (slotsForHost.isEmpty()) {
+ availableSlotsByHost.remove(host);
+ }
+
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
- final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
- final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID));
- slotsForResource.remove(slotDescriptor);
- if (slotsForResource.isEmpty()) {
- availableSlotsByResource.remove(resourceID);
+ private void remove(AllocationID slotId) throws IllegalStateException {
+ if (!tryRemove(slotId)) {
+ throw new IllegalStateException("slot not contained");
}
}
@VisibleForTesting
- boolean containResource(final ResourceID resourceID) {
- return availableSlotsByResource.containsKey(resourceID);
+ boolean containsTaskManager(ResourceID resourceID) {
+ return availableSlotsByTaskManager.containsKey(resourceID);
}
@VisibleForTesting
int size() {
return availableSlots.size();
}
+
+ @VisibleForTesting
+ void clear() {
+ availableSlots.clear();
+ availableSlotsByTaskManager.clear();
+ availableSlotsByHost.clear();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
+ * that delegates methods as RPC calls to the SlotPool's RPC gateway.
+ */
+ private static class ProviderAndOwner implements SlotOwner, SlotProvider {
+
+ private final SlotPoolGateway gateway;
+
+ private final Time timeout;
+
+ ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
+ this.gateway = gateway;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public boolean returnAllocatedSlot(Slot slot) {
+ gateway.returnAllocatedSlot(slot);
+ return true;
+ }
+
+ @Override
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+ return gateway.allocateSlot(
+ task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pending request for a slot
+ */
+ private static class PendingRequest {
+
+ private final AllocationID allocationID;
+
+ private final FlinkCompletableFuture<SimpleSlot> future;
+
+ private final ResourceProfile resourceProfile;
+
+ PendingRequest(
+ AllocationID allocationID,
+ FlinkCompletableFuture<SimpleSlot> future,
+ ResourceProfile resourceProfile) {
+ this.allocationID = allocationID;
+ this.future = future;
+ this.resourceProfile = resourceProfile;
+ }
+
+ public AllocationID allocationID() {
+ return allocationID;
+ }
+
+ public FlinkCompletableFuture<SimpleSlot> future() {
+ return future;
+ }
+
+ public ResourceProfile resourceProfile() {
+ return resourceProfile;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A slot, together with the timestamp when it was added
+ */
+ private static class SlotAndTimestamp {
+
+ private final AllocatedSlot slot;
+
+ private final long timestamp;
+
+ SlotAndTimestamp(
+ AllocatedSlot slot,
+ long timestamp) {
+ this.slot = slot;
+ this.timestamp = timestamp;
+ }
+
+ public AllocatedSlot slot() {
+ return slot;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
new file mode 100644
index 0000000..42942ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.UUID;
+
+/**
+ * The gateway for calls on the {@link SlotPool}.
+ */
+public interface SlotPoolGateway extends RpcGateway {
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ void suspend();
+
+ // ------------------------------------------------------------------------
+ // resource manager connection
+ // ------------------------------------------------------------------------
+
+ /**
+ * Connects the SlotPool to the given ResourceManager. After this method is called, the
+ * SlotPool will be able to request resources from the given ResourceManager.
+ *
+ * @param resourceManagerLeaderId The leader session ID of the resource manager.
+ * @param resourceManagerGateway The RPC gateway for the resource manager.
+ */
+ void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway);
+
+ /**
+ * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
+ * be able to request further slots from the Resource Manager, and all currently pending requests
+ * to the resource manager will be canceled.
+ *
+ * <p>The slot pool will still be able to serve slots from its internal pool.
+ */
+ void disconnectResourceManager();
+
+ // ------------------------------------------------------------------------
+ // registering / un-registering TaskManagers and slots
+ // ------------------------------------------------------------------------
+
+ void registerTaskManager(ResourceID resourceID);
+
+ void releaseTaskManager(ResourceID resourceID);
+
+ Future<Boolean> offerSlot(AllocatedSlot slot);
+
+ Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
+
+ void failAllocation(AllocationID allocationID, Exception cause);
+
+ // ------------------------------------------------------------------------
+ // allocating and disposing slots
+ // ------------------------------------------------------------------------
+
+ Future<SimpleSlot> allocateSlot(
+ ScheduledUnit task,
+ ResourceProfile resources,
+ Iterable<TaskManagerLocation> locationPreferences,
+ @RpcTimeout Time timeout);
+
+ void returnAllocatedSlot(Slot slot);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
index ec6e9b1..0ef2482 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
@@ -19,19 +19,19 @@
package org.apache.flink.runtime.jobmanager.scheduler;
public enum Locality {
-
- /**
- * No constraint existed on the task placement.
- */
+
+ /** No constraint existed on the task placement. */
UNCONSTRAINED,
-
- /**
- * The task was scheduled respecting its locality preferences.
- */
+
+ /** The task was scheduled into the same TaskManager as requested */
LOCAL,
-
- /**
- * The task was scheduled to a destination not included in its locality preferences.
- */
- NON_LOCAL
+
+ /** The task was scheduled onto the same host as requested */
+ HOST_LOCAL,
+
+ /** The task was scheduled to a destination not included in its locality preferences. */
+ NON_LOCAL,
+
+ /** No locality information was provided, it is unknown if the locality was respected */
+ UNKNOWN
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e45747b..546f31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -54,7 +54,11 @@ public class NoResourceAvailableException extends JobException {
public NoResourceAvailableException(String message) {
super(message);
}
-
+
+ public NoResourceAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 9419ab4..f477c49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobmanager.slots;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,6 +56,9 @@ public class AllocatedSlot {
/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
private final TaskManagerGateway taskManagerGateway;
+ /** RPC gateway to call the TaskManager that holds this slot */
+ private final TaskExecutorGateway taskExecutorGateway;
+
/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
private final int slotNumber;
@@ -73,15 +78,23 @@ public class AllocatedSlot {
this.slotNumber = slotNumber;
this.resourceProfile = checkNotNull(resourceProfile);
this.taskManagerGateway = checkNotNull(taskManagerGateway);
+ this.taskExecutorGateway = null;
}
- public AllocatedSlot(AllocatedSlot other) {
- this.slotAllocationId = other.slotAllocationId;
- this.jobID = other.jobID;
- this.taskManagerLocation = other.taskManagerLocation;
- this.slotNumber = other.slotNumber;
- this.resourceProfile = other.resourceProfile;
- this.taskManagerGateway = other.taskManagerGateway;
+ public AllocatedSlot(
+ AllocationID slotAllocationId,
+ JobID jobID,
+ TaskManagerLocation location,
+ int slotNumber,
+ ResourceProfile resourceProfile,
+ TaskExecutorGateway taskExecutorGateway) {
+ this.slotAllocationId = checkNotNull(slotAllocationId);
+ this.jobID = checkNotNull(jobID);
+ this.taskManagerLocation = checkNotNull(location);
+ this.slotNumber = slotNumber;
+ this.resourceProfile = checkNotNull(resourceProfile);
+ this.taskManagerGateway = null;
+ this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
}
// ------------------------------------------------------------------------
@@ -96,6 +109,17 @@ public class AllocatedSlot {
}
/**
+ * Gets the ID of the TaskManager on which this slot was allocated.
+ *
+ * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+ *
+ * @return This slot's TaskManager's ID.
+ */
+ public ResourceID getTaskManagerId() {
+ return getTaskManagerLocation().getResourceID();
+ }
+
+ /**
* Returns the ID of the job this allocated slot belongs to.
*
* @return the ID of the job this allocated slot belongs to
@@ -142,8 +166,28 @@ public class AllocatedSlot {
return taskManagerGateway;
}
+ public TaskExecutorGateway getTaskExecutorGateway() {
+ return taskExecutorGateway;
+ }
+
// ------------------------------------------------------------------------
+ /**
+ * This always returns a reference hash code.
+ */
+ @Override
+ public final int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
+ * This always checks based on reference equality.
+ */
+ @Override
+ public final boolean equals(Object obj) {
+ return this == obj;
+ }
+
@Override
public String toString() {
return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
deleted file mode 100644
index 5655fc2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotPool;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
- */
-public class PooledSlotProvider implements SlotProvider {
-
- /** The pool which holds all the slots. */
- private final SlotPool slotPool;
-
- /** The timeout for allocation. */
- private final Time timeout;
-
- public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
- this.slotPool = slotPool;
- this.timeout = timeout;
- }
-
- @Override
- public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
- boolean allowQueued) throws NoResourceAvailableException
- {
- checkNotNull(task);
-
- final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
- final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
- try {
- final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
- return FlinkCompletableFuture.completed(slot);
- } catch (InterruptedException e) {
- throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
- } catch (ExecutionException e) {
- throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
- "during allocation, " + e.getMessage());
- } catch (TimeoutException e) {
- throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
new file mode 100644
index 0000000..3fe5346
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a {@link AllocatedSlot} and a {@link Locality}.
+ */
+public class SlotAndLocality {
+
+ private final AllocatedSlot slot;
+
+ private final Locality locality;
+
+ public SlotAndLocality(AllocatedSlot slot, Locality locality) {
+ this.slot = checkNotNull(slot);
+ this.locality = checkNotNull(locality);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public AllocatedSlot slot() {
+ return slot;
+ }
+
+ public Locality locality() {
+ return locality;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "Slot: " + slot + " (" + locality + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0b3b68e..a620390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,8 +53,8 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotDescriptor;
import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -91,19 +91,18 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -160,7 +159,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private final SlotPool slotPool;
- private final Time allocationTimeout;
+ private final SlotPoolGateway slotPoolGateway;
private volatile UUID leaderSessionID;
@@ -249,8 +248,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// register self as job status change listener
executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
- this.slotPool = new SlotPool(executorService);
- this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+ this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+ this.slotPoolGateway = slotPool.getSelf();
this.registeredTaskManagers = new HashMap<>(4);
}
@@ -272,10 +271,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*/
public void start(final UUID leaderSessionID) throws Exception {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
-
- // make sure the slot pool now accepts messages for this leader
- slotPool.setJobManagerLeaderId(leaderSessionID);
-
// make sure we receive RPC and async calls
super.start();
@@ -305,8 +300,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@RpcMethod
public void startJobExecution() {
+ // double check that the leader status did not change
+ if (leaderSessionID == null) {
+ log.info("Aborting job startup - JobManager lost leader status");
+ return;
+ }
+
log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+ // start the slot pool make sure the slot pool now accepts messages for this leader
+ log.debug("Staring SlotPool component");
+ slotPool.start(leaderSessionID);
+
try {
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
@@ -328,7 +333,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void run() {
try {
- executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
+ executionGraph.scheduleForExecution(slotPool.getSlotProvider());
}
catch (Throwable t) {
executionGraph.fail(t);
@@ -353,27 +358,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
return;
}
- // receive no more messages until started again, should be called before we clear self leader id
- ((StartStoppable) getSelf()).stop();
-
+ // not leader any more - should not accept any leader messages any more
leaderSessionID = null;
- slotPool.setJobManagerLeaderId(null);
- executionGraph.suspend(cause);
- // disconnect from resource manager:
try {
resourceManagerLeaderRetriever.stop();
- } catch (Exception e) {
- log.warn("Failed to stop resource manager leader retriever when suspending.", e);
+ } catch (Throwable t) {
+ log.warn("Failed to stop resource manager leader retriever when suspending.", t);
}
- closeResourceManagerConnection();
- // TODO: in the future, the slot pool should not release the resources, so that
- // TODO: the TaskManagers offer the resources to the new leader
- for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
- slotPool.releaseResource(taskManagerId);
- }
- registeredTaskManagers.clear();
+ // tell the execution graph (JobManager is still processing messages here)
+ executionGraph.suspend(cause);
+
+ // receive no more messages until started again, should be called before we clear self leader id
+ ((StartStoppable) getSelf()).stop();
+
+ // the slot pool stops receiving messages and clears its pooled slots
+ slotPoolGateway.suspend();
+
+ // disconnect from resource manager:
+ closeResourceManagerConnection();
}
//----------------------------------------------------------------------------------------------
@@ -452,6 +456,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
}
+ @RpcMethod
public ExecutionState requestPartitionState(
final UUID leaderSessionID,
final IntermediateDataSetID intermediateResultId,
@@ -624,9 +629,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
- final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
- {
+ public Future<Iterable<SlotOffer>> offerSlots(
+ final ResourceID taskManagerId,
+ final Iterable<SlotOffer> slots,
+ final UUID leaderId) throws Exception {
+
validateLeaderSessionId(leaderSessionID);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
@@ -634,20 +641,22 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new Exception("Unknown TaskManager " + taskManagerId);
}
- final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+ final JobID jid = jobGraph.getJobID();
+ final TaskManagerLocation taskManagerLocation = taskManager.f0;
+ final TaskExecutorGateway taskManagerGateway = taskManager.f1;
+
+ final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
+
for (SlotOffer slotOffer : slots) {
- final SlotDescriptor slotDescriptor = new SlotDescriptor(
- jobGraph.getJobID(),
- taskManager.f0,
- slotOffer.getSlotIndex(),
- slotOffer.getResourceProfile(),
- null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1)
- if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) {
- acceptedSlotOffers.add(slotOffer);
- }
+ final AllocatedSlot slot = new AllocatedSlot(
+ slotOffer.getAllocationId(), jid, taskManagerLocation,
+ slotOffer.getSlotIndex(), slotOffer.getResourceProfile(),
+ taskManagerGateway);
+
+ slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
}
- return acceptedSlotOffers;
+ return slotPoolGateway.offerSlots(slotsAndOffers);
}
@RpcMethod
@@ -662,7 +671,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new Exception("Unknown TaskManager " + taskManagerId);
}
- slotPool.failAllocation(allocationId, cause);
+ slotPoolGateway.failAllocation(allocationId, cause);
}
@RpcMethod
@@ -708,7 +717,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
return new RegistrationResponse.Decline("Invalid leader session id");
}
- slotPool.registerResource(taskManagerId);
+ slotPoolGateway.registerTaskManager(taskManagerId);
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
}
@@ -840,7 +849,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
success.getResourceManagerLeaderId());
- slotPool.setResourceManager(
+ slotPoolGateway.connectToResourceManager(
success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
}
}
@@ -852,7 +861,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
- slotPool.disconnectResourceManager();
+
+ slotPoolGateway.disconnectResourceManager();
}
private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index b971b96..f30e345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -207,6 +208,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
+ protected void scheduleRunAsync(Runnable runnable, Time delay) {
+ scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
+ }
+
+ /**
+ * Execute the runnable in the main thread of the underlying RPC endpoint, with
+ * a delay of the given number of milliseconds.
+ *
+ * @param runnable Runnable to be executed
+ * @param delay The delay after which the runnable will be executed
+ */
protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay));
}
@@ -255,7 +267,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
/**
* Executor which executes runnables in the main thread context.
*/
- private class MainThreadExecutor implements Executor {
+ private static class MainThreadExecutor implements Executor {
private final MainThreadExecutable gateway;
@@ -264,7 +276,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
}
@Override
- public void execute(Runnable runnable) {
+ public void execute(@Nonnull Runnable runnable) {
gateway.runAsync(runnable);
}
}
@@ -277,7 +289,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
private Class<C> determineSelfGatewayType() {
// determine self gateway type
- Class c = getClass();
+ Class<?> c = getClass();
Class<C> determinedSelfGatewayType;
do {
determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);