You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:08 UTC
[flink] 07/16: [FLINK-12763][runtime] SlotManager fails
unfulfillable slot requests if it is set to do so.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 973afee77ea6b22a286889b4d46cc750cd11e617
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 16:50:59 2019 +0800
[FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.
---
.../resourcemanager/slotmanager/SlotManager.java | 47 +++++++++++++++++
.../slotmanager/SlotManagerTest.java | 59 ++++++++++++++++++++++
2 files changed, 106 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d85aec5..320612a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -123,6 +123,14 @@ public class SlotManager implements AutoCloseable {
/** Release task executor only when each produced result partition is either consumed or failed. */
private final boolean waitResultConsumedBeforeRelease;
+ /**
+ * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
+ *
+ * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
+ * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate.
+ * */
+ private boolean failUnfulfillableRequest = false;
+
public SlotManager(
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
@@ -462,6 +470,28 @@ public class SlotManager implements AutoCloseable {
}
}
+ public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+ if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
+ // fail unfulfillable pending requests
+ Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+ while (slotRequestIterator.hasNext()) {
+ PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
+ if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {
+ continue;
+ }
+ if (!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+ resourceActions.notifyAllocationFailure(
+ pendingSlotRequest.getJobId(),
+ pendingSlotRequest.getAllocationId(),
+ new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". "
+ + "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.")
+ );
+ }
+ }
+ }
+ this.failUnfulfillableRequest = failUnfulfillableRequest;
+ }
+
// ---------------------------------------------------------------------------------------------
// Behaviour methods
// ---------------------------------------------------------------------------------------------
@@ -720,6 +750,14 @@ public class SlotManager implements AutoCloseable {
}
pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+ if (!pendingTaskManagerSlotOptional.isPresent()) {
+ // request can not be fulfilled by any free slot or pending slot that can be allocated,
+ // check whether it can be fulfilled by allocated slots
+ boolean fulfillable = isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile());
+ if (!fulfillable && failUnfulfillableRequest) {
+ throw new ResourceManagerException("Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+ }
+ }
}
}
@@ -733,6 +771,15 @@ public class SlotManager implements AutoCloseable {
return Optional.empty();
}
+ private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) {
+ for (TaskManagerSlot slot : slots.values()) {
+ if (slot.getResourceProfile().isMatching(resourceProfile)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index c358866..07427d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -58,6 +58,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1486,6 +1487,64 @@ public class SlotManagerTest extends TestLogger {
}
}
+ /**
+ * Tests that SlotManager fails unfulfillable slot requests properly
+ */
+ @Test
+ public void testFailUnfulfillableSlotRequests() throws Exception {
+ final JobID jobId = new JobID();
+ final ResourceProfile registeredSlotFulfillableProfile = new ResourceProfile(2.0, 100);
+ final ResourceProfile pendingSlotFulfillableProfile = new ResourceProfile(1.0, 200);
+ final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
+
+ final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
+ final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+ .setAllocateResourceFunction((resourceProfile) ->
+ pendingSlotFulfillableProfile.isMatching(resourceProfile) ?
+ Collections.singleton(pendingSlotFulfillableProfile) : Collections.emptyList())
+ .setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1)).build();
+
+ final ResourceID resourceID = ResourceID.generate();
+ final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
+ final SlotReport slotReport =
+ new SlotReport(Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), registeredSlotFulfillableProfile)));
+
+ try (final SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
+ slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+ // initially, no request should fail
+ SlotRequest slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+ SlotRequest slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+ SlotRequest slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+ assertTrue(slotManager.registerSlotRequest(slotRequest1));
+ assertTrue(slotManager.registerSlotRequest(slotRequest2));
+ assertTrue(slotManager.registerSlotRequest(slotRequest3));
+ assertEquals(0, notifiedAllocationFailures.size());
+
+ // set fail unfulfillable request, pending request 3 should fail
+ slotManager.setFailUnfulfillableRequest(true);
+ assertEquals(1, notifiedAllocationFailures.size());
+ assertEquals(slotRequest3.getAllocationId(), notifiedAllocationFailures.get(0));
+
+ // request again, request 3 should fail
+ slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+ slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+ slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+ assertTrue(slotManager.registerSlotRequest(slotRequest1));
+ assertTrue(slotManager.registerSlotRequest(slotRequest2));
+ Exception exception = null;
+ try {
+ slotManager.registerSlotRequest(slotRequest3);
+ } catch (Exception e) {
+ exception = e;
+ }
+ assertNotNull(exception);
+ assertEquals(1, notifiedAllocationFailures.size());
+ }
+ }
+
private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
return (ResourceProfile resourceProfile) -> {
final int slots = function.apply(resourceProfile);