You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/10 20:51:39 UTC

[flink] branch release-1.8 updated (755ab6f -> 54c44eb)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 755ab6f  [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
     new 48f5c78  [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if there were no slot allocations after the partition check
     new 54c44eb  [hotfix][tests][coordination] Move idle task manager release tests into a separate suite

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resourcemanager/slotmanager/SlotManager.java   |  28 ++--
 .../slotmanager/SlotManagerTest.java               |  88 -----------
 .../TaskManagerReleaseInSlotManagerTest.java       | 172 +++++++++++++++++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |   6 +-
 .../TestingTaskExecutorGatewayBuilder.java         |   4 +-
 5 files changed, 196 insertions(+), 102 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java


[flink] 01/02: [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if there were no slot allocations after the partition check

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48f5c78177abb322ba6a7d60f375973f22831146
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Thu Jul 4 16:12:32 2019 +0200

    [FLINK-12736][coordination] Release TaskExecutor in SlotManager only if there were no slot allocations after the partition check
    
    The ResourceManager looks out for TaskManagers that have not had any slots allocated on them for a while, as these could be released to safe resources.
    If such a TM is found, the RM checks via an RPC call whether the TM still holds any partitions. If no partition is held then the TM is released.
    However, in the RPC callback no check is made whether the TM is actually still idle. In the meantime a slot could have been allocated on the TM.
    Even if the slot has been freed, there can be newly allocated partitions not included in check result.
    
    To make sure there was no resource allocation in between, we can mark the taskManagerRegistration.getIdleSince() time before starting the async 'no partition' check.
    The TM can be released only if the idle time after the check matches the previously marked one. Otherwise we discard the release and start over after the next timeout.
    
    This closes #9041.
---
 .../resourcemanager/slotmanager/SlotManager.java   | 28 +++++++++++++-------
 .../slotmanager/SlotManagerTest.java               | 30 +++++++++++++++++-----
 .../taskexecutor/TestingTaskExecutorGateway.java   |  6 ++---
 .../TestingTaskExecutorGatewayBuilder.java         |  4 +--
 4 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 571b5bc..d85aec5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -1023,22 +1023,32 @@ public class SlotManager implements AutoCloseable {
 
 			// second we trigger the release resource callback which can decide upon the resource release
 			for (TaskManagerRegistration taskManagerRegistration : timedOutTaskManagers) {
-				InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
 				if (waitResultConsumedBeforeRelease) {
-					// checking whether TaskManagers can be safely removed
-					taskManagerRegistration.getTaskManagerConnection().getTaskExecutorGateway().canBeReleased()
-						.thenAcceptAsync(canBeReleased -> {
-							if (canBeReleased) {
-								releaseTaskExecutor(timedOutTaskManagerId);
-							}},
-							mainThreadExecutor);
+					releaseTaskExecutorIfPossible(taskManagerRegistration);
 				} else {
-					releaseTaskExecutor(timedOutTaskManagerId);
+					releaseTaskExecutor(taskManagerRegistration.getInstanceId());
 				}
 			}
 		}
 	}
 
+	private void releaseTaskExecutorIfPossible(TaskManagerRegistration taskManagerRegistration) {
+		long idleSince = taskManagerRegistration.getIdleSince();
+		taskManagerRegistration
+			.getTaskManagerConnection()
+			.getTaskExecutorGateway()
+			.canBeReleased()
+			.thenAcceptAsync(
+				canBeReleased -> {
+					InstanceID timedOutTaskManagerId = taskManagerRegistration.getInstanceId();
+					boolean stillIdle = idleSince == taskManagerRegistration.getIdleSince();
+					if (stillIdle && canBeReleased) {
+						releaseTaskExecutor(timedOutTaskManagerId);
+					}
+				},
+				mainThreadExecutor);
+	}
+
 	private void releaseTaskExecutor(InstanceID timedOutTaskManagerId) {
 		final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
 		LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 1e1d214..8760f10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -71,7 +71,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -719,7 +718,7 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 
-		final AtomicBoolean canBeReleased = new AtomicBoolean(false);
+		final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
 		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
 			.setCanBeReleasedSupplier(canBeReleased::get)
 			.createTestingTaskExecutorGateway();
@@ -742,14 +741,31 @@ public class SlotManagerTest extends TestLogger {
 			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
 			// now it can not be released yet
-			canBeReleased.set(false);
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
 			mainThreadExecutor.triggerAll();
-			assertFalse(releaseFuture.isDone());
+			canBeReleased.get().complete(false);
+			mainThreadExecutor.triggerAll();
+			assertThat(releaseFuture.isDone(), is(false));
+
+			// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
+			// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+			mainThreadExecutor.triggerAll();
+			AllocationID allocationID = new AllocationID();
+			slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+			mainThreadExecutor.triggerAll();
+			slotManager.freeSlot(slotId, allocationID);
+			canBeReleased.get().complete(true);
+			mainThreadExecutor.triggerAll();
+			assertThat(releaseFuture.isDone(), is(false));
 
 			// now it can and should be released
-			canBeReleased.set(true);
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts);
+			canBeReleased.set(new CompletableFuture<>());
+			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+			mainThreadExecutor.triggerAll();
+			canBeReleased.get().complete(true);
 			mainThreadExecutor.triggerAll();
 			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 5f325da..c554951 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -70,7 +70,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction;
 
-	private final Supplier<Boolean> canBeReleasedSupplier;
+	private final Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier;
 
 	TestingTaskExecutorGateway(
 			String address,
@@ -83,7 +83,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 			Consumer<ResourceID> heartbeatResourceManagerConsumer,
 			Consumer<Exception> disconnectResourceManagerConsumer,
 			Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction,
-			Supplier<Boolean> canBeReleasedSupplier) {
+			Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier) {
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
 		this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer);
@@ -185,7 +185,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	@Override
 	public CompletableFuture<Boolean> canBeReleased() {
-		return CompletableFuture.completedFuture(canBeReleasedSupplier.get());
+		return canBeReleasedSupplier.get();
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 483f285..de4a830 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -61,7 +61,7 @@ public class TestingTaskExecutorGatewayBuilder {
 	private Consumer<ResourceID> heartbeatResourceManagerConsumer = NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER;
 	private Consumer<Exception> disconnectResourceManagerConsumer = NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER;
 	private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
-	private Supplier<Boolean> canBeReleasedSupplier = () -> true;
+	private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () -> CompletableFuture.completedFuture(true);
 
 	public TestingTaskExecutorGatewayBuilder setAddress(String address) {
 		this.address = address;
@@ -113,7 +113,7 @@ public class TestingTaskExecutorGatewayBuilder {
 		return this;
 	}
 
-	public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<Boolean> canBeReleasedSupplier) {
+	public TestingTaskExecutorGatewayBuilder setCanBeReleasedSupplier(Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier) {
 		this.canBeReleasedSupplier = canBeReleasedSupplier;
 		return this;
 	}


[flink] 02/02: [hotfix][tests][coordination] Move idle task manager release tests into a separate suite

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54c44eb53aaec7630b78f329854bc0f835b09449
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 8 10:51:19 2019 +0200

    [hotfix][tests][coordination] Move idle task manager release tests into a separate suite
---
 .../slotmanager/SlotManagerTest.java               | 104 -------------
 .../TaskManagerReleaseInSlotManagerTest.java       | 172 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 104 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8760f10..c358866 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -669,109 +668,6 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that idle task managers time out after the configured timeout. A timed out task manager
-	 * will be removed from the slot manager and the resource manager will be notified about the
-	 * timeout, if it can be released.
-	 */
-	@Test
-	public void testTaskManagerTimeout() throws Exception {
-		final long tmTimeout = 10L;
-
-		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
-		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
-			.build();
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceID resourceID = ResourceID.generate();
-
-		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-		final SlotID slotId = new SlotID(resourceID, 0);
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-
-		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
-
-		try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-			.setTaskManagerTimeout(Time.milliseconds(tmTimeout))
-			.build()) {
-
-			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
-			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
-		}
-	}
-
-	/**
-	 * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
-	 */
-	@Test
-	public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
-		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
-		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
-			.build();
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceID resourceID = ResourceID.generate();
-
-		final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
-		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
-			.setCanBeReleasedSupplier(canBeReleased::get)
-			.createTestingTaskExecutorGateway();
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-		final SlotID slotId = new SlotID(resourceID, 0);
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-
-		final ManuallyTriggeredScheduledExecutor mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
-
-		try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-			.setScheduledExecutor(mainThreadExecutor)
-			.setTaskManagerTimeout(Time.milliseconds(0L))
-			.build()) {
-
-			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
-			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-			// now it can not be released yet
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			canBeReleased.get().complete(false);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.isDone(), is(false));
-
-			// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
-			// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			AllocationID allocationID = new AllocationID();
-			slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
-			mainThreadExecutor.triggerAll();
-			slotManager.freeSlot(slotId, allocationID);
-			canBeReleased.get().complete(true);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.isDone(), is(false));
-
-			// now it can and should be released
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			canBeReleased.get().complete(true);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
-		}
-	}
-
-	/**
 	 * Tests that slot requests time out after the specified request timeout. If a slot request
 	 * times out, then the request is cancelled, removed from the slot manager and the resource
 	 * manager is notified about the failed allocation.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
new file mode 100644
index 0000000..25f7ccb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite for idle task managers release in slot manager.
+ */
+public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
+	private static final ResourceID resourceID = ResourceID.generate();
+	private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+	private static final SlotID slotId = new SlotID(resourceID, 0);
+	private static final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+	private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+	private static final SlotReport slotReport = new SlotReport(slotStatus);
+
+	private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference<>();
+	private final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+		.setCanBeReleasedSupplier(canBeReleasedFuture::get)
+		.createTestingTaskExecutorGateway();
+	private final TaskExecutorConnection taskManagerConnection =
+		new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+	private CompletableFuture<InstanceID> releaseFuture;
+	private ResourceActions resourceManagerActions;
+	private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+
+	@Before
+	public void setup() {
+		canBeReleasedFuture.set(new CompletableFuture<>());
+		releaseFuture = new CompletableFuture<>();
+		resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+			.build();
+		mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+	}
+
+	/**
+	 * Tests that idle task managers time out after the configured timeout. A timed out task manager
+	 * will be removed from the slot manager and the resource manager will be notified about the
+	 * timeout, if it can be released.
+	 */
+	@Test
+	public void testTaskManagerTimeout() throws Exception {
+		Executor executor = TestingUtils.defaultExecutor();
+		canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
+		try (SlotManager slotManager = SlotManagerBuilder
+			.newBuilder()
+			.setTaskManagerTimeout(Time.milliseconds(10L))
+			.build()) {
+
+			slotManager.start(resourceManagerId, executor, resourceManagerActions);
+			executor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
+		}
+	}
+
+	/**
+	 * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
+	 */
+	@Test
+	public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
+		try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
+			verifyTmReleased(false);
+
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+			verifyTmReleased(true);
+		}
+	}
+
+	/**
+	 * Tests that idle task managers will not be released after "can be" check in case of concurrent resource allocations.
+	 */
+	@Test
+	public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
+		try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> {
+				// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
+				// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
+				AllocationID allocationID = new AllocationID();
+				slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+				mainThreadExecutor.triggerAll();
+				slotManager.freeSlot(slotId, allocationID);
+			});
+			verifyTmReleased(false);
+
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+			verifyTmReleased(true);
+		}
+	}
+
+	private SlotManager createAndStartSlotManagerWithTM() {
+		SlotManager slotManager = SlotManagerBuilder
+			.newBuilder()
+			.setScheduledExecutor(mainThreadExecutor)
+			.setTaskManagerTimeout(Time.milliseconds(0L))
+			.build();
+		slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
+		mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+		return slotManager;
+	}
+
+	private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+			SlotManager slotManager,
+			boolean canBeReleased) throws Exception {
+		checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {});
+	}
+
+	private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+			SlotManager slotManager,
+			boolean canBeReleased,
+			RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
+		canBeReleasedFuture.set(new CompletableFuture<>());
+		mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+		mainThreadExecutor.triggerAll();
+		doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
+		canBeReleasedFuture.get().complete(canBeReleased); // finish TM.canBeReleased request
+		mainThreadExecutor.triggerAll();
+	}
+
+	private void verifyTmReleased(boolean isTmReleased) {
+		assertThat(releaseFuture.isDone(), is(isTmReleased));
+		if (isTmReleased) {
+			assertThat(releaseFuture.join(), is(equalTo(taskManagerConnection.getInstanceID())));
+		}
+	}
+}