You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/11/06 22:55:39 UTC
[flink] branch release-1.9 updated: [FLINK-14589] Redundant slot
requests with the same AllocationID leads to inconsistent slot table
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 6904fc6 [FLINK-14589] Redundant slot requests with the same AllocationID leads to inconsistent slot table
6904fc6 is described below
commit 6904fc6dc55d43ead2bf95f0cf495b3e98a53337
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Mon Nov 4 23:13:03 2019 -0800
[FLINK-14589] Redundant slot requests with the same AllocationID leads to inconsistent slot table
When a slot request is redundantly made with the same AllocationID to a
slot index other than the already allocated one, slot table becomes
inconsistent having two slot indices allocated but one AllocationID
assigned to only the latest slot index. This can lead to slot leakage.
This patch prevents such redundent slot request from rendering
inconsistent slot allocation state by rejecting the request.
This closes #10099.
---
.../runtime/taskexecutor/slot/TaskSlotTable.java | 7 +++++-
.../taskexecutor/slot/TaskSlotTableTest.java | 28 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 41036b3..2c361a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -193,7 +193,12 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
checkInit();
- TaskSlot taskSlot = taskSlots.get(index);
+ TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
+ if (taskSlot != null) {
+ LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
+ return false;
+ }
+ taskSlot = taskSlots.get(index);
boolean result = taskSlot.allocate(jobId, allocationId);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index ef3fc06..46ba55c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@@ -85,6 +86,33 @@ public class TaskSlotTableTest extends TestLogger {
}
}
+ /**
+ * Tests that redundant slot allocation with the same AllocationID to a different slot is rejected.
+ */
+ @Test
+ public void testRedundantSlotAllocation() {
+ final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.nCopies(2, ResourceProfile.UNKNOWN));
+
+ try {
+ taskSlotTable.start(new TestingSlotActionsBuilder().build());
+
+ final JobID jobId = new JobID();
+ final AllocationID allocationId = new AllocationID();
+
+ assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true));
+ assertThat(taskSlotTable.allocateSlot(1, jobId, allocationId, SLOT_TIMEOUT), is(false));
+
+ assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), is(true));
+ assertThat(taskSlotTable.isSlotFree(1), is(true));
+
+ Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+ assertThat(allocatedSlots.next().getIndex(), is(0));
+ assertThat(allocatedSlots.hasNext(), is(false));
+ } finally {
+ taskSlotTable.stop();
+ }
+ }
+
@Nonnull
private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) {
return new TaskSlotTable(