You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:08 UTC

[flink] 07/16: [FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 973afee77ea6b22a286889b4d46cc750cd11e617
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 16:50:59 2019 +0800

    [FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.
---
 .../resourcemanager/slotmanager/SlotManager.java   | 47 +++++++++++++++++
 .../slotmanager/SlotManagerTest.java               | 59 ++++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d85aec5..320612a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -123,6 +123,14 @@ public class SlotManager implements AutoCloseable {
 	/** Release task executor only when each produced result partition is either consumed or failed. */
 	private final boolean waitResultConsumedBeforeRelease;
 
+	/**
+	 * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
+	 *
+	 * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
+	 * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate.
+	 * */
+	private boolean failUnfulfillableRequest = false;
+
 	public SlotManager(
 			ScheduledExecutor scheduledExecutor,
 			Time taskManagerRequestTimeout,
@@ -462,6 +470,28 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
+	public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+		if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
+			// fail unfulfillable pending requests
+			Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+			while (slotRequestIterator.hasNext()) {
+				PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
+				if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {
+					continue;
+				}
+				if (!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+					resourceActions.notifyAllocationFailure(
+						pendingSlotRequest.getJobId(),
+						pendingSlotRequest.getAllocationId(),
+						new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". "
+							+ "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.")
+					);
+				}
+			}
+		}
+		this.failUnfulfillableRequest = failUnfulfillableRequest;
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Behaviour methods
 	// ---------------------------------------------------------------------------------------------
@@ -720,6 +750,14 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+			if (!pendingTaskManagerSlotOptional.isPresent()) {
+				// request can not be fulfilled by any free slot or pending slot that can be allocated,
+				// check whether it can be fulfilled by allocated slots
+				boolean fulfillable = isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile());
+				if (!fulfillable && failUnfulfillableRequest) {
+					throw new ResourceManagerException("Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+				}
+			}
 		}
 	}
 
@@ -733,6 +771,15 @@ public class SlotManager implements AutoCloseable {
 		return Optional.empty();
 	}
 
+	private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) {
+		for (TaskManagerSlot slot : slots.values()) {
+			if (slot.getResourceProfile().isMatching(resourceProfile)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
 		final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index c358866..07427d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -58,6 +58,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1486,6 +1487,64 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that SlotManager fails unfulfillable slot requests properly
+	 */
+	@Test
+	public void testFailUnfulfillableSlotRequests() throws Exception {
+		final JobID jobId = new JobID();
+		final ResourceProfile registeredSlotFulfillableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile pendingSlotFulfillableProfile = new ResourceProfile(1.0, 200);
+		final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
+
+		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction((resourceProfile) ->
+				pendingSlotFulfillableProfile.isMatching(resourceProfile) ?
+					Collections.singleton(pendingSlotFulfillableProfile) : Collections.emptyList())
+			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1)).build();
+
+		final ResourceID resourceID = ResourceID.generate();
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
+		final SlotReport slotReport =
+			new SlotReport(Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), registeredSlotFulfillableProfile)));
+
+		try (final SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			// initially, no request should fail
+			SlotRequest slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+			SlotRequest slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+			SlotRequest slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
+			assertTrue(slotManager.registerSlotRequest(slotRequest2));
+			assertTrue(slotManager.registerSlotRequest(slotRequest3));
+			assertEquals(0, notifiedAllocationFailures.size());
+
+			// set fail unfulfillable request, pending request 3 should fail
+			slotManager.setFailUnfulfillableRequest(true);
+			assertEquals(1, notifiedAllocationFailures.size());
+			assertEquals(slotRequest3.getAllocationId(), notifiedAllocationFailures.get(0));
+
+			// request again, request 3 should fail
+			slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+			slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+			slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
+			assertTrue(slotManager.registerSlotRequest(slotRequest2));
+			Exception exception = null;
+			try {
+				slotManager.registerSlotRequest(slotRequest3);
+			} catch (Exception e) {
+				exception = e;
+			}
+			assertNotNull(exception);
+			assertEquals(1, notifiedAllocationFailures.size());
+		}
+	}
+
 	private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
 		return (ResourceProfile resourceProfile) -> {
 			final int slots = function.apply(resourceProfile);