You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:36 UTC
[flink] 09/10: [FLINK-12765][jobmanager] Let some slot reqests fail
if the sharing slot is oversubscribed
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a587576ef513f1df5a3c19b0e01a9eb43ec6f80e
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Sat Jun 22 00:13:56 2019 +0800
[FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed
---
.../runtime/jobmaster/slotpool/SchedulerImpl.java | 53 ++++-
.../SharedSlotOversubscribedException.java | 40 ++++
.../jobmaster/slotpool/SlotSharingManager.java | 89 ++++++++-
.../jobmaster/slotpool/SlotPoolCoLocationTest.java | 162 +++++++++++++++
.../slotpool/SlotPoolSlotSharingTest.java | 92 +++++++++
.../jobmaster/slotpool/SlotSharingManagerTest.java | 219 +++++++++++++++++++++
6 files changed, 640 insertions(+), 15 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index 57c42b8..ef87dd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
@@ -145,24 +146,53 @@ public class SchedulerImpl implements Scheduler {
componentMainThreadExecutor.assertRunningInMainThread();
final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();
+ internalAllocateSlot(
+ allocationResultFuture,
+ slotRequestId,
+ scheduledUnit,
+ slotProfile,
+ allowQueuedScheduling,
+ allocationTimeout);
+ return allocationResultFuture;
+ }
+ private void internalAllocateSlot(
+ CompletableFuture<LogicalSlot> allocationResultFuture,
+ SlotRequestId slotRequestId,
+ ScheduledUnit scheduledUnit,
+ SlotProfile slotProfile,
+ boolean allowQueuedScheduling,
+ Time allocationTimeout) {
CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
- cancelSlotRequest(
- slotRequestId,
- scheduledUnit.getSlotSharingGroupId(),
- failure);
- allocationResultFuture.completeExceptionally(failure);
+ Optional<SharedSlotOversubscribedException> sharedSlotOverAllocatedException =
+ ExceptionUtils.findThrowable(failure, SharedSlotOversubscribedException.class);
+ if (sharedSlotOverAllocatedException.isPresent() &&
+ sharedSlotOverAllocatedException.get().canRetry()) {
+
+ // Retry the allocation
+ internalAllocateSlot(
+ allocationResultFuture,
+ slotRequestId,
+ scheduledUnit,
+ slotProfile,
+ allowQueuedScheduling,
+ allocationTimeout);
+ } else {
+ cancelSlotRequest(
+ slotRequestId,
+ scheduledUnit.getSlotSharingGroupId(),
+ failure);
+ allocationResultFuture.completeExceptionally(failure);
+ }
} else {
allocationResultFuture.complete(slot);
}
});
-
- return allocationResultFuture;
}
@Override
@@ -354,7 +384,14 @@ public class SchedulerImpl implements Scheduler {
if (taskSlot != null) {
Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
- return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL);
+
+ SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
+
+ if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
+ return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
+ }
+
+ throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
} else {
// the slot may have been cancelled in the mean time
coLocationConstraint.setSlotRequestId(null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
new file mode 100644
index 0000000..8a61612
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+/**
+ * If a shared slot is over-allocated before it has been resolved,
+ * some requests will be rejected with this exception to ensure the
+ * total resource requested do not exceed the total resources. The
+ * released requests can be retried if couldRetry is marked.
+ */
+class SharedSlotOversubscribedException extends Exception {
+
+ /** Whether the requester can retry the request. */
+ private final boolean canRetry;
+
+ SharedSlotOversubscribedException(String message, boolean canRetry) {
+ super(message);
+ this.canRetry = canRetry;
+ }
+
+ boolean canRetry() {
+ return canRetry;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index a7afbac..2c81a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -41,6 +41,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.AbstractCollection;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -48,6 +49,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -356,9 +358,9 @@ public class SlotSharingManager {
CompletableFuture<? extends SlotContext> slotContextFuture,
@Nullable SlotRequestId allocatedSlotRequestId) {
super(slotRequestId, groupId);
+ Preconditions.checkNotNull(slotContextFuture);
this.parent = parent;
- this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
this.allocatedSlotRequestId = allocatedSlotRequestId;
this.children = new HashMap<>(16);
@@ -366,12 +368,19 @@ public class SlotSharingManager {
this.reservedResources = ResourceProfile.ZERO;
- slotContextFuture.whenComplete(
- (SlotContext ignored, Throwable throwable) -> {
- if (throwable != null) {
- release(throwable);
- }
- });
+ this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+ if (throwable != null) {
+ // If the underlying resource request failed, we currently fail all the requests
+ release(throwable);
+ throw new CompletionException(throwable);
+ }
+
+ if (parent == null) {
+ checkOversubscriptionAndReleaseChildren(slotContext);
+ }
+
+ return slotContext;
+ });
}
CompletableFuture<? extends SlotContext> getSlotContextFuture() {
@@ -511,6 +520,30 @@ public class SlotSharingManager {
}
/**
+ * Checks if the task slot may have enough resource to fulfill the specific
+ * request. If the underlying slot is not allocated, the check is skipped.
+ *
+ * @param resourceProfile The specific request to check.
+ * @return Whether the slot is possible to fulfill the request in the future.
+ */
+ boolean mayHaveEnoughResourcesToFulfill(ResourceProfile resourceProfile) {
+ if (!slotContextFuture.isDone()) {
+ return true;
+ }
+
+ MultiTaskSlot root = this;
+
+ while (root.parent != null) {
+ root = root.parent;
+ }
+
+ SlotContext slotContext = root.getSlotContextFuture().join();
+
+ return slotContext.getResourceProfile().isMatching(
+ resourceProfile.merge(root.getReservedResources()));
+ }
+
+ /**
* Releases the child with the given childGroupId.
*
* @param childGroupId identifying the child to release
@@ -548,6 +581,48 @@ public class SlotSharingManager {
}
}
+ private void checkOversubscriptionAndReleaseChildren(SlotContext slotContext) {
+ final ResourceProfile slotResources = slotContext.getResourceProfile();
+ final ArrayList<TaskSlot> childrenToEvict = new ArrayList<>();
+ ResourceProfile requiredResources = ResourceProfile.ZERO;
+
+ for (TaskSlot slot : children.values()) {
+ final ResourceProfile resourcesWithChild = requiredResources.merge(slot.getReservedResources());
+
+ if (slotResources.isMatching(resourcesWithChild)) {
+ requiredResources = resourcesWithChild;
+ } else {
+ childrenToEvict.add(slot);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " +
+ "number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " +
+ "evicted requests is {},",
+ children.size(),
+ childrenToEvict.size(),
+ slotContext.getResourceProfile(),
+ requiredResources,
+ childrenToEvict);
+ }
+
+ if (childrenToEvict.size() == children.size()) {
+ // Since RM always return a slot whose resource is larger than the requested one,
+ // The current situation only happens when we request to RM using the resource
+ // profile of a task who is belonging to a CoLocationGroup. Similar to dealing
+ // with the failure of the underlying request, currently we fail all the requests
+ // directly.
+ release(new SharedSlotOversubscribedException(
+ "The allocated slot does not have enough resource for any task.", false));
+ } else {
+ for (TaskSlot taskSlot : childrenToEvict) {
+ taskSlot.release(new SharedSlotOversubscribedException(
+ "The allocated slot does not have enough resource for all the tasks.", true));
+ }
+ }
+ }
+
@Override
public String toString() {
String physicalSlotDescription;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
index 44c9977..52cc3af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -41,6 +41,7 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test cases for {@link CoLocationConstraint} with the {@link SlotPoolImpl}.
@@ -158,4 +160,164 @@ public class SlotPoolCoLocationTest extends TestLogger {
assertEquals(logicalSlot21.getAllocationId(), logicalSlot22.getAllocationId());
assertNotEquals(logicalSlot11.getAllocationId(), logicalSlot21.getAllocationId());
}
+
+ @Test
+ public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionException, InterruptedException {
+ final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+ final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+ final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+ final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+ final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+ CoLocationGroup group = new CoLocationGroup();
+ CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+ JobVertexID jobVertexId1 = new JobVertexID();
+ JobVertexID jobVertexId2 = new JobVertexID();
+ JobVertexID jobVertexId3 = new JobVertexID();
+
+ final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp1),
+ TestingUtils.infiniteTime());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp2),
+ TestingUtils.infiniteTime());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId3,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp3),
+ TestingUtils.infiniteTime());
+
+ final AllocationID allocationId1 = allocationIds.take();
+
+ Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ Collections.singletonList(new SlotOffer(
+ allocationId1,
+ 0,
+ allocatedSlotRp)));
+
+ assertFalse(slotOfferFuture1.isEmpty());
+
+ for (CompletableFuture<LogicalSlot> logicalSlotFuture : Arrays.asList(logicalSlotFuture1, logicalSlotFuture2, logicalSlotFuture3)) {
+ assertTrue(logicalSlotFuture.isDone() && logicalSlotFuture.isCompletedExceptionally());
+ logicalSlotFuture.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+ assertTrue(throwable instanceof SharedSlotOversubscribedException);
+ assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+ });
+ }
+ }
+
+ @Test
+ public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionException, InterruptedException {
+ final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+ final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+ final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+ final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+ final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+ CoLocationGroup group = new CoLocationGroup();
+ CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+ JobVertexID jobVertexId1 = new JobVertexID();
+ JobVertexID jobVertexId2 = new JobVertexID();
+ JobVertexID jobVertexId3 = new JobVertexID();
+
+ final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp1),
+ TestingUtils.infiniteTime());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp2),
+ TestingUtils.infiniteTime());
+
+ final AllocationID allocationId1 = allocationIds.take();
+
+ Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ Collections.singletonList(new SlotOffer(
+ allocationId1,
+ 0,
+ allocatedSlotRp)));
+
+ assertFalse(slotOfferFuture1.isEmpty());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId3,
+ slotSharingGroupId,
+ coLocationConstraint1),
+ true,
+ SlotProfile.noLocality(rp3),
+ TestingUtils.infiniteTime());
+
+ LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+ LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+ assertEquals(allocationId1, logicalSlot1.getAllocationId());
+ assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+ assertTrue(logicalSlotFuture3.isDone() && logicalSlotFuture3.isCompletedExceptionally());
+ logicalSlotFuture3.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+ assertTrue(throwable instanceof SharedSlotOversubscribedException);
+ assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+ });
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index f1447d0..f2b8fa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -331,4 +331,96 @@ public class SlotPoolSlotSharingTest extends TestLogger {
assertEquals(allocationId2, logicalSlot3.getAllocationId());
}
+ @Test
+ public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
+ final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+ final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+ final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+ final ResourceProfile firstAllocatedSlotRp = new ResourceProfile(3.0, 300);
+ final ResourceProfile secondAllocatedSlotRp = new ResourceProfile(5.0, 500);
+
+ final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+ final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+ testingResourceManagerGateway.setRequestSlotConsumer(
+ (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+ final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+ slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+ final JobVertexID jobVertexId1 = new JobVertexID();
+ final JobVertexID jobVertexId2 = new JobVertexID();
+ final JobVertexID jobVertexId3 = new JobVertexID();
+
+ final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+ CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId1,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(rp1),
+ TestingUtils.infiniteTime());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId2,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(rp2),
+ TestingUtils.infiniteTime());
+
+ CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+ new ScheduledUnit(
+ jobVertexId3,
+ slotSharingGroupId,
+ null),
+ true,
+ SlotProfile.noLocality(rp3),
+ TestingUtils.infiniteTime());
+
+ assertFalse(logicalSlotFuture1.isDone());
+ assertFalse(logicalSlotFuture2.isDone());
+ assertFalse(logicalSlotFuture3.isDone());
+
+ final AllocationID allocationId1 = allocationIds.take();
+
+ // This should fulfill the first two requests.
+ boolean offerFuture = slotPool.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId1,
+ 0,
+ firstAllocatedSlotRp));
+
+ assertTrue(offerFuture);
+
+ LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+ LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+ assertEquals(allocationId1, logicalSlot1.getAllocationId());
+ assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+ // The third request will retry.
+ assertFalse(logicalSlotFuture3.isDone());
+ final AllocationID allocationId2 = allocationIds.take();
+
+ offerFuture = slotPool.offerSlot(
+ taskManagerLocation,
+ new SimpleAckingTaskManagerGateway(),
+ new SlotOffer(
+ allocationId2,
+ 1,
+ secondAllocatedSlotRp));
+
+ assertTrue(offerFuture);
+
+ LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+ assertEquals(allocationId2, logicalSlot3.getAllocationId());
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 0742ac2..9bc1d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
import org.apache.flink.api.common.resources.GPUResource;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -27,11 +28,13 @@ import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -39,18 +42,28 @@ import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
+import java.net.InetAddress;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
/**
* Test cases for the {@link SlotSharingManager}.
@@ -605,4 +618,210 @@ public class SlotSharingManagerTest extends TestLogger {
firstChild.release(new Throwable("Release for testing"));
assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources());
}
+
+ @Test
+ public void testHashEnoughResourceOfMultiTaskSlot() {
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+ ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+ ResourceProfile allocatedSlotRp = new ResourceProfile(2.0, 200);
+
+ final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+ SlotSharingManager slotSharingManager = new SlotSharingManager(
+ SLOT_SHARING_GROUP_ID,
+ allocatedSlotActions,
+ SLOT_OWNER);
+
+ CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+ SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+ new SlotRequestId(),
+ slotContextFuture,
+ new SlotRequestId());
+
+ SlotSharingManager.MultiTaskSlot multiTaskSlot =
+ unresolvedRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
+
+ SlotSharingManager.SingleTaskSlot firstChild = multiTaskSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ rp1,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(true));
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+
+ slotContextFuture.complete(new AllocatedSlot(
+ new AllocationID(),
+ new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+ 0,
+ allocatedSlotRp,
+ mock(TaskManagerGateway.class)));
+
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(false));
+ assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+ }
+
+ @Test
+ public void testSlotAllocatedWithEnoughResource() {
+ SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(16.0, 1600));
+
+ // With enough resources, all the requests should be fulfilled.
+ for (SlotSharingManager.SingleTaskSlot singleTaskSlot : context.singleTaskSlotsInOrder) {
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+ }
+
+ // The multi-task slot for coLocation should be kept.
+ assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+ }
+
+ @Test
+ public void testSlotOverAllocatedAndSingleSlotReleased() {
+ SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(7.0, 700));
+
+ // The two coLocated requests and the third request is successful.
+ for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+ SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+ if (i != 3) {
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+ } else {
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+ singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+ assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+ assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+ });
+ }
+ }
+
+ // The multi-task slot for coLocation should be kept.
+ assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+ }
+
+ @Test
+ public void testSlotOverAllocatedAndMultiTaskSlotReleased() {
+ SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(3.0, 300));
+
+ // Only the third request is fulfilled.
+ for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+ SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+ if (i == 2) {
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+ } else {
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+ singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+ assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+ assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+ });
+ }
+ }
+
+ // The multi-task slot for coLocation should not be kept.
+ assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), nullValue());
+ }
+
+ @Test
+ public void testSlotOverAllocatedAndAllTaskSlotReleased() {
+ SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(2.0, 200));
+
+ // Only the third request is fulfilled.
+ for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+ SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+ assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+ singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+ assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+
+ // Since no request is fulfilled, these requests will be failed and should not retry.
+ assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(false));
+ });
+ }
+
+ // All the task slots should be removed.
+ assertThat(context.slotSharingManager.isEmpty(), is(true));
+ }
+
+ private SlotSharingResourceTestContext createResourceTestContext(ResourceProfile allocatedResourceProfile) {
+ ResourceProfile coLocationTaskRp = new ResourceProfile(2.0, 200);
+ ResourceProfile thirdChildRp = new ResourceProfile(3.0, 300);
+ ResourceProfile forthChildRp = new ResourceProfile(9.0, 900);
+
+ final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+ SlotSharingManager slotSharingManager = new SlotSharingManager(
+ SLOT_SHARING_GROUP_ID,
+ allocatedSlotActions,
+ SLOT_OWNER);
+
+ CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+ SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+ new SlotRequestId(),
+ slotContextFuture,
+ new SlotRequestId());
+
+ SlotSharingManager.MultiTaskSlot coLocationTaskSlot = unresolvedRootSlot.allocateMultiTaskSlot(
+ new SlotRequestId(), new SlotSharingGroupId());
+
+ SlotSharingManager.SingleTaskSlot firstCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ coLocationTaskRp,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+ SlotSharingManager.SingleTaskSlot secondCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ coLocationTaskRp,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+
+ SlotSharingManager.SingleTaskSlot thirdChild = unresolvedRootSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ thirdChildRp,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+
+ SlotSharingManager.SingleTaskSlot forthChild = unresolvedRootSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ forthChildRp,
+ new SlotSharingGroupId(),
+ Locality.LOCAL);
+
+ slotContextFuture.complete(new AllocatedSlot(
+ new AllocationID(),
+ new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+ 0,
+ allocatedResourceProfile,
+ mock(TaskManagerGateway.class)));
+
+ return new SlotSharingResourceTestContext(
+ slotSharingManager,
+ coLocationTaskSlot,
+ Arrays.asList(firstCoLocatedChild, secondCoLocatedChild, thirdChild, forthChild));
+ }
+
+ /**
+ * An utility class maintains the testing sharing slot hierarchy.
+ */
+ private class SlotSharingResourceTestContext {
+ final SlotSharingManager slotSharingManager;
+ final SlotSharingManager.MultiTaskSlot coLocationTaskSlot;
+ final List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder;
+
+ SlotSharingResourceTestContext(
+ @Nonnull SlotSharingManager slotSharingManager,
+ @Nonnull SlotSharingManager.MultiTaskSlot coLocationTaskSlot,
+ @Nonnull List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder) {
+
+ this.slotSharingManager = slotSharingManager;
+ this.coLocationTaskSlot = coLocationTaskSlot;
+ this.singleTaskSlotsInOrder = singleTaskSlotsInOrder;
+ }
+ }
}