You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/18 14:32:54 UTC

[flink] 01/02: [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48e724fed60d10a11e3cc39e9c6e964002a926b3
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Wed Jul 18 15:54:55 2018 +0800

    [FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
    
    This closes #6360.
---
 .../resourcemanager/slotmanager/SlotManager.java   |  3 +
 .../slotmanager/SlotManagerTest.java               | 66 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f9c8c0e..d54d143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -594,6 +594,9 @@ public class SlotManager implements AutoCloseable {
 						// set the allocation id such that the slot won't be considered for the pending slot request
 						slot.updateAllocation(allocationId, jobId);
 
+						// remove the pending request if any as it has been assigned
+						pendingSlotRequests.remove(allocationId);
+
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(
 							pendingSlotRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5..cfca017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -1217,6 +1218,71 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that pending request is removed if task executor reports a slot with its allocation id.
+	 */
+	@Test
+	public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
+				new TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+			final JobID jobID = new JobID();
+			final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest1);
+
+			final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+			final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
+
+			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+					.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
+						requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+						try {
+							return responseQueue.take();
+						} catch (InterruptedException ignored) {
+							return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
+						}
+					})
+					.createTestingTaskExecutorGateway();
+
+			final ResourceID taskExecutorResourceId = ResourceID.generate();
+			final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+			final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(firstManualSlotRequestResponse);
+
+			slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+			final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
+			responseQueue.offer(secondManualSlotRequestResponse);
+
+			final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+			slotManager.registerSlotRequest(slotRequest2);
+
+			// fail first request
+			firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
+
+			final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+			// fail second request
+			secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+			assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
+			assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
+			assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+			secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+			final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
+			assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
+			assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(1));
+		}
+	}
+
+	/**
 	 * Tests notify the job manager of the allocations when the task manager is failed/killed.
 	 */
 	@Test