You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 15:33:39 UTC

[5/6] flink git commit: [FLINK-8089] Also check for other pending slot requests in offerSlot

[FLINK-8089] Also check for other pending slot requests in offerSlot

Not only check for a slot request with the right allocation id but also check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

This closes #5090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/401d0065
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/401d0065
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/401d0065

Branch: refs/heads/master
Commit: 401d006516caa2a9d8289e760ccd3a9c564bc795
Parents: bc1c375
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 13 15:42:07 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 14:32:25 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   | 18 ++---
 .../apache/flink/runtime/instance/SlotPool.java | 41 +++++------
 .../flink/runtime/instance/SlotPoolTest.java    | 73 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index a3f98f1..97be592 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -108,15 +108,6 @@ public class AllocatedSlot {
 	}
 
 	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	public int getPhysicalSlotNumber() {
-		return physicalSlotNumber;
-	}
-
-	/**
 	 * Gets the resource profile of the slot.
 	 *
 	 * @return The resource profile of the slot.
@@ -146,6 +137,15 @@ public class AllocatedSlot {
 	}
 
 	/**
+	 * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
+	 *
+	 * @return true if a logical slot is allocated from this slot, otherwise false
+	 */
+	public boolean isUsed() {
+		return logicalSlotReference.get() != null;
+	}
+
+	/**
 	 * Triggers the release of the logical slot.
 	 */
 	public void triggerLogicalSlotRelease() {

http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index a72f57b..68f5be6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -281,7 +281,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);
 
 		if (allocatedSlot != null) {
-			internalReturnAllocatedSlot(allocatedSlot);
+			if (allocatedSlot.releaseLogicalSlot()) {
+				tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
+			} else {
+				throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.');
+			}
 		} else {
 			log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
 		}
@@ -342,9 +346,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 				try {
 					return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
 				} catch (SlotException e) {
-					internalReturnAllocatedSlot(allocatedSlot);
-
-					throw new CompletionException("Could not allocate a logical simple slot.", e);
+					throw new CompletionException("Could not allocate a logical simple slot from allocate slot " +
+						allocatedSlot + '.', e);
 				}
 			});
 	}
@@ -464,6 +467,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		Preconditions.checkNotNull(e);
 
 		if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
+			LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
 			pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
 		}
 	}
@@ -497,28 +501,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
-	 * slot can be reused by other pending requests if the resource profile matches.n
+	 * Tries to fulfill with the given allocated slot a pending slot request or add the
+	 * allocated slot to the set of available slots if no matching request is available.
 	 *
 	 * @param allocatedSlot which shall be returned
 	 */
-	private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
-		if (allocatedSlot.releaseLogicalSlot()) {
+	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
+		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");
 
-			final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
+		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
 
-			if (pendingRequest != null) {
-				LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
-					pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
+		if (pendingRequest != null) {
+			LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
 
-				allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
-				pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
-			} else {
-				LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
-				availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
-			}
+			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
+			pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
 		} else {
-			LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
+			LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
 		}
 	}
 
@@ -643,7 +644,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 			// we were actually not waiting for this:
 			//   - could be that this request had been fulfilled
 			//   - we are receiving the slots from TaskManagers after becoming leaders
-			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
+			tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 		}
 
 		// we accepted the request in any case. slot will be released after it idled for

http://git-wip-us.apache.org/repos/asf/flink/blob/401d0065/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index ec20f6b..1af9cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -50,7 +51,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -383,6 +386,76 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that unused offered slots are directly used to fulfill pending slot
+	 * requests.
+	 *
+	 * <p>See FLINK-8089
+	 */
+	@Test
+	public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+		final JobMasterId jobMasterId = JobMasterId.generate();
+		final String jobMasterAddress = "foobar";
+		final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+
+		resourceManagerGateway.setRequestSlotConsumer(
+			(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+		final SlotRequestID slotRequestId1 = new SlotRequestID();
+		final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+		try {
+			slotPool.start(jobMasterId, jobMasterAddress);
+
+			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+			final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
+
+			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+			CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot(
+				slotRequestId1,
+				scheduledUnit,
+				ResourceProfile.UNKNOWN,
+				Collections.emptyList(),
+				timeout);
+
+			// wait for the first slot request
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot(
+				slotRequestId2,
+				scheduledUnit,
+				ResourceProfile.UNKNOWN,
+				Collections.emptyList(),
+				timeout);
+
+			slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+			try {
+				// this should fail with a CancellationException
+				slotFuture1.get();
+				fail("The first slot future should have failed because it was cancelled.");
+			} catch (ExecutionException ee) {
+				assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException);
+			}
+
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+
+			slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+			assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());
+
+			// the slot offer should fulfill the second slot request
+			assertEquals(allocationId, slotFuture2.get().getAllocationId());
+		} finally {
+			RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+		}
+	}
+
 	private static ResourceManagerGateway createResourceManagerGatewayMock() {
 		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway