You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/11/06 22:55:39 UTC

[flink] branch release-1.9 updated: [FLINK-14589] Redundant slot requests with the same AllocationID leads to inconsistent slot table

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 6904fc6  [FLINK-14589] Redundant slot requests with the same AllocationID leads to inconsistent slot table
6904fc6 is described below

commit 6904fc6dc55d43ead2bf95f0cf495b3e98a53337
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Mon Nov 4 23:13:03 2019 -0800

    [FLINK-14589] Redundant slot requests with the same AllocationID leads to inconsistent slot table
    
    When a slot request is redundantly made with the same AllocationID to a
    slot index other than the already allocated one, slot table becomes
    inconsistent having two slot indices allocated but one AllocationID
    assigned to only the latest slot index. This can lead to slot leakage.
    This patch prevents such redundent slot request from rendering
    inconsistent slot allocation state by rejecting the request.
    
    This closes #10099.
---
 .../runtime/taskexecutor/slot/TaskSlotTable.java   |  7 +++++-
 .../taskexecutor/slot/TaskSlotTableTest.java       | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 41036b3..2c361a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -193,7 +193,12 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
 		checkInit();
 
-		TaskSlot taskSlot = taskSlots.get(index);
+		TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
+		if (taskSlot != null) {
+			LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
+			return false;
+		}
+		taskSlot = taskSlots.get(index);
 
 		boolean result = taskSlot.allocate(jobId, allocationId);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index ef3fc06..46ba55c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -36,6 +36,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -85,6 +86,33 @@ public class TaskSlotTableTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that redundant slot allocation with the same AllocationID to a different slot is rejected.
+	 */
+	@Test
+	public void testRedundantSlotAllocation() {
+		final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.nCopies(2, ResourceProfile.UNKNOWN));
+
+		try {
+			taskSlotTable.start(new TestingSlotActionsBuilder().build());
+
+			final JobID jobId = new JobID();
+			final AllocationID allocationId = new AllocationID();
+
+			assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true));
+			assertThat(taskSlotTable.allocateSlot(1, jobId, allocationId, SLOT_TIMEOUT), is(false));
+
+			assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), is(true));
+			assertThat(taskSlotTable.isSlotFree(1), is(true));
+
+			Iterator<TaskSlot> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+			assertThat(allocatedSlots.next().getIndex(), is(0));
+			assertThat(allocatedSlots.hasNext(), is(false));
+		} finally {
+			taskSlotTable.stop();
+		}
+	}
+
 	@Nonnull
 	private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) {
 		return new TaskSlotTable(