You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/09 12:18:47 UTC

[flink] 01/02: [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if there were no slot allocations after the partition check

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c762cec46be9328466fc8bdc87337c221eaa82e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Jul 4 16:12:32 2019 +0200

    [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if there were no slot allocations after the partition check
    
    The ResourceManager looks out for TaskManagers that have not had any slots allocated on them for a while, as these could be released to safe resources.
    If such a TM is found, the RM checks via an RPC call whether the TM still holds any partitions. If no partition is held then the TM is released.
    However, in the RPC callback no check is made whether the TM is actually still idle. In the meantime a slot could have been allocated on the TM.
    Even if the slot has been freed, there can be newly allocated partitions not included in check result.
    
    To make sure there was no resource allocation in between, we can mark the taskManagerRegistration.getIdleSince() time before starting the async 'no partition' check.
    The TM can be released only if the idle time after the check matches the previously marked one. Otherwise we discard the release and start over after the next timeout.
    
    This closes #8988.
---
 .../resourcemanager/slotmanager/SlotManager.java   | 28 +++++++++++++-------
 .../slotmanager/SlotManagerTest.java               | 30 +++++++++++++++++-----
 .../taskexecutor/TestingTaskExecutorGateway.java   |  6 ++---
 .../TestingTaskExecutorGatewayBuilder.java         |  4 +--
 4 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 571b5bc..d85aec5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -1023,22 +1023,32 @@ public class SlotManager implements AutoCloseable {
 
 			// second we trigger the release resource callback which can decide upon the resource release
 			for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
-				InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
 				if (waitResultConsumedBeforeRelease) {
-					// checking whether TaskManagers can be safely removed
-					taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
-						.thenAcceptAsync(canBeReleased -> {
-							if (canBeReleased) {
-								releaseTaskExecutor(timedOutTaskManagerId);
-							}},
-							mainThreadExecutor);
+					releaseTaskExecutorIfPossible(taskManagerRegistration);
 				} else {
-					releaseTaskExecutor(timedOutTaskManagerId);
+					releaseTaskExecutor(taskManagerRegistration.getInstanceId());
 				}
 			}
 		}
 	}
 
+	private void releaseTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
+		long idleSince = taskManagerRegistration.getIdleSince();
+		taskManagerRegistration
+			.getTaskManagerConnection()
+			.getTaskExecutorGateway()
+			.canBeReleased()
+			.thenAcceptAsync(
+				canBeReleased -> {
+					InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
+					boolean stillIdle = idleSince == taskManagerRegistration.getIdleSince();
+					if (stillIdle && canBeReleased) {
+						releaseTaskExecutor(timedOutTaskManagerId);
+					}
+				},
+				mainThreadExecutor);
+	}
+
 	private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
 		final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
 		LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 1e1d214..8760f10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -71,7 +71,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -719,7 +718,7 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 
-		final AtomicBoolean canBeReleased = new AtomicBoolean(false);
+		final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
 		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
 			.setCanBeReleasedSupplier(canBeReleased::get)
 			.createTestingTaskExecutorGateway();
@@ -742,14 +741,31 @@ public class SlotManagerTest extends TestLogger {
 			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
 			// now it can not be released yet
-			canBeReleased.set(false);
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
 			mainThreadExecutor.triggerAll();
-			assertFalse(releaseFuture.isDone());
+			canBeReleased.get().complete(false);
+			mainThreadExecutor.triggerAll();
+			assertThat(releaseFuture.isDone(), is(false));
+
+			// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
+			// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+			mainThreadExecutor.triggerAll();
+			AllocationID allocationID = new AllocationID();
+			slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+			mainThreadExecutor.triggerAll();
+			slotManager.freeSlot(slotId, allocationID);
+			canBeReleased.get().complete(true);
+			mainThreadExecutor.triggerAll();
+			assertThat(releaseFuture.isDone(), is(false));
 
 			// now it can and should be released
-			canBeReleased.set(true);
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+			mainThreadExecutor.triggerAll();
+			canBeReleased.get().complete(true);
 			mainThreadExecutor.triggerAll();
 			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 8c20e49..3aa00e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -72,7 +72,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction;
 
-	private final Supplier<Boolean> canBeReleasedSupplier;
+	private final Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier;
 
 	private final BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer;
 
@@ -87,7 +87,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 			Consumer<ResourceID> heartbeatResourceManagerConsumer,
 			Consumer<Exception> disconnectResourceManagerConsumer,
 			Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction,
-			Supplier<Boolean> canBeReleasedSupplier,
+			Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier,
 			BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -186,7 +186,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public CompletableFuture<Boolean> canBeReleased() {
-		return CompletableFuture.completedFuture(canBeReleasedSupplier.get());
+		return canBeReleasedSupplier.get();
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 770d28e..c176ff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -64,7 +64,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private Consumer<ResourceID> heartbeatResourceManagerConsumer = NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER;
 	private Consumer<Exception> disconnectResourceManagerConsumer = NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER;
 	private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
-	private Supplier<Boolean> canBeReleasedSupplier = () -> true;
+	private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () -> CompletableFuture.completedFuture(true);
 	private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER;
 
 	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
@@ -117,7 +117,7 @@ public class TestingTaskExecutorGatewayBuilder {
 		return this;
 	}
 
-	public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<Boolean> canBeReleasedSupplier) {
+	public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier) {
 		this.canBeReleasedSupplier = canBeReleasedSupplier;
 		return this;
 	}