You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/02 14:51:54 UTC

[flink] branch release-1.9 updated (e6f2a25 -> aa44ffc)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e6f2a25  [FLINK-16694][ci] Limit number of dumped log lines
     add d61467a  [hotfix] Add TaskSlotTableTest.testAllocatedSlotTimeout
     new aa44ffc  [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/taskexecutor/slot/TaskSlotTable.java   | 24 ++++----
 .../taskexecutor/slot/TaskSlotTableTest.java       | 68 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 10 deletions(-)


[flink] 01/01: [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa44ffc72a9a3839a1c39d8717c9d0e085c1296c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu May 28 16:54:29 2020 +0200

    [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive
    
    In order to avoid timing out activated slots, we also need to deactivate the slot timeout
    in case that TaskSlotTable.tryMarkSlotActive is being called. This can happen if the response
    for JobMasterGateway.offerSlots has been too late and timed out.
    
    This closes #12391.
---
 .../runtime/taskexecutor/slot/TaskSlotTable.java   | 24 ++++++-----
 .../taskexecutor/slot/TaskSlotTableTest.java       | 48 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 2c361a1..ce418e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -237,18 +237,22 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
-			if (taskSlot.markActive()) {
-				// unregister a potential timeout
-				LOG.info("Activate slot {}.", allocationId);
+			return markExistingSlotActive(taskSlot);
+		} else {
+			throw new SlotNotFoundException(allocationId);
+		}
+	}
 
-				timerService.unregisterTimeout(allocationId);
+	private boolean markExistingSlotActive(TaskSlot taskSlot) {
+		if (taskSlot.markActive()) {
+			// unregister a potential timeout
+			LOG.info("Activate slot {}.", taskSlot.getAllocationId());
 
-				return true;
-			} else {
-				return false;
-			}
+			timerService.unregisterTimeout(taskSlot.getAllocationId());
+
+			return true;
 		} else {
-			throw new SlotNotFoundException(allocationId);
+			return false;
 		}
 	}
 
@@ -394,7 +398,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) {
-			return taskSlot.markActive();
+			return markExistingSlotActive(taskSlot);
 		} else {
 			return false;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index b6235b8..a8eb1ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.TriFunction;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
@@ -38,10 +40,13 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link TaskSlotTable}.
@@ -133,6 +138,49 @@ public class TaskSlotTableTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
+		runDeactivateSlotTimeoutTest((taskSlotTable, jobId, allocationId) -> {
+			try {
+				return taskSlotTable.markSlotActive(allocationId);
+			} catch (SlotNotFoundException e) {
+				ExceptionUtils.rethrow(e);
+				return false;
+			}
+		});
+	}
+
+	@Test
+	public void testTryMarkSlotActiveDeactivatesSlotTimeout() throws Exception {
+		runDeactivateSlotTimeoutTest(TaskSlotTable::tryMarkSlotActive);
+	}
+
+	private void runDeactivateSlotTimeoutTest(TriFunction<TaskSlotTable, JobID, AllocationID, Boolean> taskSlotTableAction) throws Exception {
+		final CompletableFuture<AllocationID> timeoutFuture = new CompletableFuture<>();
+		final TestingSlotActions testingSlotActions = new TestingSlotActionsBuilder()
+			.setTimeoutSlotConsumer((allocationID, uuid) -> timeoutFuture.complete(allocationID))
+			.build();
+
+		final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN));
+
+		try {
+			taskSlotTable.start(testingSlotActions);
+
+			final AllocationID allocationId = new AllocationID();
+			final long timeout = 50L;
+			final JobID jobId = new JobID();
+			assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, Time.milliseconds(timeout)), is(true));
+			assertThat(taskSlotTableAction.apply(taskSlotTable, jobId, allocationId), is(true));
+
+			try {
+				timeoutFuture.get(timeout, TimeUnit.MILLISECONDS);
+				fail("The slot timeout should have been deactivated.");
+			} catch (TimeoutException expected) {}
+		} finally {
+			taskSlotTable.stop();
+		}
+	}
+
 	@Nonnull
 	private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) {
 		return new TaskSlotTable(