You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/09 12:18:48 UTC

[flink] 02/02: [hotfix][tests][coordination] Move idle task manager release tests into a separate suite

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd75bcd43f6fafee779caf24ddfd56f0e2cf07b0
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 8 10:51:19 2019 +0200

    [hotfix][tests][coordination] Move idle task manager release tests into a separate suite
---
 .../slotmanager/SlotManagerTest.java               | 104 -------------
 .../TaskManagerReleaseInSlotManagerTest.java       | 172 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 104 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8760f10..c358866 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -669,109 +668,6 @@ public class SlotManagerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that idle task managers time out after the configured timeout. A timed out task manager
-	 * will be removed from the slot manager and the resource manager will be notified about the
-	 * timeout, if it can be released.
-	 */
-	@Test
-	public void testTaskManagerTimeout() throws Exception {
-		final long tmTimeout = 10L;
-
-		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
-		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
-			.build();
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceID resourceID = ResourceID.generate();
-
-		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-		final SlotID slotId = new SlotID(resourceID, 0);
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-
-		final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
-
-		try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-			.setTaskManagerTimeout(Time.milliseconds(tmTimeout))
-			.build()) {
-
-			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
-			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
-		}
-	}
-
-	/**
-	 * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
-	 */
-	@Test
-	public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
-		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
-		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
-			.build();
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceID resourceID = ResourceID.generate();
-
-		final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
-		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
-			.setCanBeReleasedSupplier(canBeReleased::get)
-			.createTestingTaskExecutorGateway();
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
-		final SlotID slotId = new SlotID(resourceID, 0);
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-
-		final ManuallyTriggeredScheduledExecutor mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
-
-		try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
-			.setScheduledExecutor(mainThreadExecutor)
-			.setTaskManagerTimeout(Time.milliseconds(0L))
-			.build()) {
-
-			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
-			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
-			// now it can not be released yet
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			canBeReleased.get().complete(false);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.isDone(), is(false));
-
-			// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
-			// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			AllocationID allocationID = new AllocationID();
-			slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
-			mainThreadExecutor.triggerAll();
-			slotManager.freeSlot(slotId, allocationID);
-			canBeReleased.get().complete(true);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.isDone(), is(false));
-
-			// now it can and should be released
-			canBeReleased.set(new CompletableFuture<>());
-			mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
-			mainThreadExecutor.triggerAll();
-			canBeReleased.get().complete(true);
-			mainThreadExecutor.triggerAll();
-			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
-		}
-	}
-
-	/**
 	 * Tests that slot requests time out after the specified request timeout. If a slot request
 	 * times out, then the request is cancelled, removed from the slot manager and the resource
 	 * manager is notified about the failed allocation.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
new file mode 100644
index 0000000..25f7ccb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite for idle task managers release in slot manager.
+ */
+public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
+	private static final ResourceID resourceID = ResourceID.generate();
+	private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+	private static final SlotID slotId = new SlotID(resourceID, 0);
+	private static final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+	private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+	private static final SlotReport slotReport = new SlotReport(slotStatus);
+
+	private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference<>();
+	private final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+		.setCanBeReleasedSupplier(canBeReleasedFuture::get)
+		.createTestingTaskExecutorGateway();
+	private final TaskExecutorConnection taskManagerConnection =
+		new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+	private CompletableFuture<InstanceID> releaseFuture;
+	private ResourceActions resourceManagerActions;
+	private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+
+	@Before
+	public void setup() {
+		canBeReleasedFuture.set(new CompletableFuture<>());
+		releaseFuture = new CompletableFuture<>();
+		resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+			.build();
+		mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+	}
+
+	/**
+	 * Tests that idle task managers time out after the configured timeout. A timed out task manager
+	 * will be removed from the slot manager and the resource manager will be notified about the
+	 * timeout, if it can be released.
+	 */
+	@Test
+	public void testTaskManagerTimeout() throws Exception {
+		Executor executor = TestingUtils.defaultExecutor();
+		canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
+		try (SlotManager slotManager = SlotManagerBuilder
+			.newBuilder()
+			.setTaskManagerTimeout(Time.milliseconds(10L))
+			.build()) {
+
+			slotManager.start(resourceManagerId, executor, resourceManagerActions);
+			executor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
+		}
+	}
+
+	/**
+	 * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
+	 */
+	@Test
+	public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
+		try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
+			verifyTmReleased(false);
+
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+			verifyTmReleased(true);
+		}
+	}
+
+	/**
+	 * Tests that idle task managers will not be released after "can be" check in case of concurrent resource allocations.
+	 */
+	@Test
+	public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
+		try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> {
+				// Allocate and free slot between triggering TM.canBeReleased request and receiving response.
+				// There can be potentially newly unreleased partitions, therefore TM can not be released yet.
+				AllocationID allocationID = new AllocationID();
+				slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+				mainThreadExecutor.triggerAll();
+				slotManager.freeSlot(slotId, allocationID);
+			});
+			verifyTmReleased(false);
+
+			checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+			verifyTmReleased(true);
+		}
+	}
+
+	private SlotManager createAndStartSlotManagerWithTM() {
+		SlotManager slotManager = SlotManagerBuilder
+			.newBuilder()
+			.setScheduledExecutor(mainThreadExecutor)
+			.setTaskManagerTimeout(Time.milliseconds(0L))
+			.build();
+		slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
+		mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+		return slotManager;
+	}
+
+	private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+			SlotManager slotManager,
+			boolean canBeReleased) throws Exception {
+		checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {});
+	}
+
+	private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+			SlotManager slotManager,
+			boolean canBeReleased,
+			RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
+		canBeReleasedFuture.set(new CompletableFuture<>());
+		mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+		mainThreadExecutor.triggerAll();
+		doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
+		canBeReleasedFuture.get().complete(canBeReleased); // finish TM.canBeReleased request
+		mainThreadExecutor.triggerAll();
+	}
+
+	private void verifyTmReleased(boolean isTmReleased) {
+		assertThat(releaseFuture.isDone(), is(isTmReleased));
+		if (isTmReleased) {
+			assertThat(releaseFuture.join(), is(equalTo(taskManagerConnection.getInstanceID())));
+		}
+	}
+}