You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/18 14:32:54 UTC
[flink] 01/02: [FLINK-9884] [runtime] fix slot request may not be
removed when it has already be assigned in slot manager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 48e724fed60d10a11e3cc39e9c6e964002a926b3
Author: shuai-xu <sh...@foxmail.com>
AuthorDate: Wed Jul 18 15:54:55 2018 +0800
[FLINK-9884] [runtime] fix slot request may not be removed when it has already be assigned in slot manager
This closes #6360.
---
.../resourcemanager/slotmanager/SlotManager.java | 3 +
.../slotmanager/SlotManagerTest.java | 66 ++++++++++++++++++++++
2 files changed, 69 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f9c8c0e..d54d143 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -594,6 +594,9 @@ public class SlotManager implements AutoCloseable {
// set the allocation id such that the slot won't be considered for the pending slot request
slot.updateAllocation(allocationId, jobId);
+ // remove the pending request if any as it has been assigned
+ pendingSlotRequests.remove(allocationId);
+
// this will try to find a new slot for the request
rejectPendingSlotRequest(
pendingSlotRequest,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index fb82aa5..cfca017 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
@@ -1217,6 +1218,71 @@ public class SlotManagerTest extends TestLogger {
}
/**
+ * Tests that pending request is removed if task executor reports a slot with its allocation id.
+ */
+ @Test
+ public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
+ try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
+ new TestingResourceActionsBuilder().createTestingResourceActions())) {
+
+ final JobID jobID = new JobID();
+ final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+ slotManager.registerSlotRequest(slotRequest1);
+
+ final BlockingQueue<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestSlotQueue = new ArrayBlockingQueue<>(1);
+ final BlockingQueue<CompletableFuture<Acknowledge>> responseQueue = new ArrayBlockingQueue<>(1);
+
+ final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> {
+ requestSlotQueue.offer(slotIDJobIDAllocationIDStringResourceManagerIdTuple5);
+ try {
+ return responseQueue.take();
+ } catch (InterruptedException ignored) {
+ return FutureUtils.completedExceptionally(new FlinkException("Response queue was interrupted."));
+ }
+ })
+ .createTestingTaskExecutorGateway();
+
+ final ResourceID taskExecutorResourceId = ResourceID.generate();
+ final TaskExecutorConnection taskExecutionConnection = new TaskExecutorConnection(taskExecutorResourceId, testingTaskExecutorGateway);
+ final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceId, 0), ResourceProfile.UNKNOWN));
+
+ final CompletableFuture<Acknowledge> firstManualSlotRequestResponse = new CompletableFuture<>();
+ responseQueue.offer(firstManualSlotRequestResponse);
+
+ slotManager.registerTaskManager(taskExecutionConnection, slotReport);
+
+ final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> firstRequest = requestSlotQueue.take();
+
+ final CompletableFuture<Acknowledge> secondManualSlotRequestResponse = new CompletableFuture<>();
+ responseQueue.offer(secondManualSlotRequestResponse);
+
+ final SlotRequest slotRequest2 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
+ slotManager.registerSlotRequest(slotRequest2);
+
+ // fail first request
+ firstManualSlotRequestResponse.completeExceptionally(new TimeoutException("Test exception to fail first allocation"));
+
+ final Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId> secondRequest = requestSlotQueue.take();
+
+ // fail second request
+ secondManualSlotRequestResponse.completeExceptionally(new SlotOccupiedException("Test exception", slotRequest1.getAllocationId(), jobID));
+
+ assertThat(firstRequest.f2, equalTo(slotRequest1.getAllocationId()));
+ assertThat(secondRequest.f2, equalTo(slotRequest2.getAllocationId()));
+ assertThat(secondRequest.f0, equalTo(firstRequest.f0));
+
+ secondManualSlotRequestResponse.complete(Acknowledge.get());
+
+ final TaskManagerSlot slot = slotManager.getSlot(secondRequest.f0);
+ assertThat(slot.getState(), equalTo(TaskManagerSlot.State.ALLOCATED));
+ assertThat(slot.getAllocationId(), equalTo(firstRequest.f2));
+
+ assertThat(slotManager.getNumberRegisteredSlots(), is(1));
+ }
+ }
+
+ /**
* Tests notify the job manager of the allocations when the task manager is failed/killed.
*/
@Test