You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/07/09 12:18:48 UTC
[flink] 02/02: [hotfix][tests][coordination] Move idle task manager
release tests into a separate suite
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd75bcd43f6fafee779caf24ddfd56f0e2cf07b0
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Jul 8 10:51:19 2019 +0200
[hotfix][tests][coordination] Move idle task manager release tests into a separate suite
---
.../slotmanager/SlotManagerTest.java | 104 -------------
.../TaskManagerReleaseInSlotManagerTest.java | 172 +++++++++++++++++++++
2 files changed, 172 insertions(+), 104 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8760f10..c358866 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -669,109 +668,6 @@ public class SlotManagerTest extends TestLogger {
}
/**
- * Tests that idle task managers time out after the configured timeout. A timed out task manager
- * will be removed from the slot manager and the resource manager will be notified about the
- * timeout, if it can be released.
- */
- @Test
- public void testTaskManagerTimeout() throws Exception {
- final long tmTimeout = 10L;
-
- final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
- final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
- .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
- .build();
- final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceID resourceID = ResourceID.generate();
-
- final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
- final SlotID slotId = new SlotID(resourceID, 0);
- final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
- final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
- final SlotReport slotReport = new SlotReport(slotStatus);
-
- final Executor mainThreadExecutor = TestingUtils.defaultExecutor();
-
- try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
- .setTaskManagerTimeout(Time.milliseconds(tmTimeout))
- .build()) {
-
- slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
- mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
- assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
- }
- }
-
- /**
- * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
- */
- @Test
- public void testTaskManagerNotReleasedBeforeItCanBe() throws Exception {
- final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
- final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
- .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
- .build();
- final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
- final ResourceID resourceID = ResourceID.generate();
-
- final AtomicReference<CompletableFuture<Boolean>> canBeReleased = new AtomicReference<>();
- final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
- .setCanBeReleasedSupplier(canBeReleased::get)
- .createTestingTaskExecutorGateway();
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-
- final SlotID slotId = new SlotID(resourceID, 0);
- final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
- final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
- final SlotReport slotReport = new SlotReport(slotStatus);
-
- final ManuallyTriggeredScheduledExecutor mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
-
- try (SlotManager slotManager = SlotManagerBuilder.newBuilder()
- .setScheduledExecutor(mainThreadExecutor)
- .setTaskManagerTimeout(Time.milliseconds(0L))
- .build()) {
-
- slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
-
- mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
-
- // now it can not be released yet
- canBeReleased.set(new CompletableFuture<>());
- mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
- mainThreadExecutor.triggerAll();
- canBeReleased.get().complete(false);
- mainThreadExecutor.triggerAll();
- assertThat(releaseFuture.isDone(), is(false));
-
- // Allocate and free slot between triggering TM.canBeReleased request and receiving response.
- // There can be potentially newly unreleased partitions, therefore TM can not be released yet.
- canBeReleased.set(new CompletableFuture<>());
- mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
- mainThreadExecutor.triggerAll();
- AllocationID allocationID = new AllocationID();
- slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
- mainThreadExecutor.triggerAll();
- slotManager.freeSlot(slotId, allocationID);
- canBeReleased.get().complete(true);
- mainThreadExecutor.triggerAll();
- assertThat(releaseFuture.isDone(), is(false));
-
- // now it can and should be released
- canBeReleased.set(new CompletableFuture<>());
- mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
- mainThreadExecutor.triggerAll();
- canBeReleased.get().complete(true);
- mainThreadExecutor.triggerAll();
- assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
- }
- }
-
- /**
* Tests that slot requests time out after the specified request timeout. If a slot request
* times out, then the request is cancelled, removed from the slot manager and the resource
* manager is notified about the failed allocation.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
new file mode 100644
index 0000000..25f7ccb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerReleaseInSlotManagerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test suite for idle task managers release in slot manager.
+ */
+public class TaskManagerReleaseInSlotManagerTest extends TestLogger {
+ private static final ResourceID resourceID = ResourceID.generate();
+ private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ private static final SlotID slotId = new SlotID(resourceID, 0);
+ private static final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+ private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+ private static final SlotReport slotReport = new SlotReport(slotStatus);
+
+ private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference<>();
+ private final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setCanBeReleasedSupplier(canBeReleasedFuture::get)
+ .createTestingTaskExecutorGateway();
+ private final TaskExecutorConnection taskManagerConnection =
+ new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+ private CompletableFuture<InstanceID> releaseFuture;
+ private ResourceActions resourceManagerActions;
+ private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
+
+ @Before
+ public void setup() {
+ canBeReleasedFuture.set(new CompletableFuture<>());
+ releaseFuture = new CompletableFuture<>();
+ resourceManagerActions = new TestingResourceActionsBuilder()
+ .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+ .build();
+ mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+ }
+
+ /**
+ * Tests that idle task managers time out after the configured timeout. A timed out task manager
+ * will be removed from the slot manager and the resource manager will be notified about the
+ * timeout, if it can be released.
+ */
+ @Test
+ public void testTaskManagerTimeout() throws Exception {
+ Executor executor = TestingUtils.defaultExecutor();
+ canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
+ try (SlotManager slotManager = SlotManagerBuilder
+ .newBuilder()
+ .setTaskManagerTimeout(Time.milliseconds(10L))
+ .build()) {
+
+ slotManager.start(resourceManagerId, executor, resourceManagerActions);
+ executor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+ assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
+ }
+ }
+
+ /**
+ * Tests that idle but not releasable task managers will not be released even if timed out before it can be.
+ */
+ @Test
+ public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
+ try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, false);
+ verifyTmReleased(false);
+
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+ verifyTmReleased(true);
+ }
+ }
+
+ /**
+ * Tests that idle task managers will not be released after "can be" check in case of concurrent resource allocations.
+ */
+ @Test
+ public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
+ try (SlotManager slotManager = createAndStartSlotManagerWithTM()) {
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true, () -> {
+ // Allocate and free slot between triggering TM.canBeReleased request and receiving response.
+ // There can be potentially newly unreleased partitions, therefore TM can not be released yet.
+ AllocationID allocationID = new AllocationID();
+ slotManager.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
+ mainThreadExecutor.triggerAll();
+ slotManager.freeSlot(slotId, allocationID);
+ });
+ verifyTmReleased(false);
+
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, true);
+ verifyTmReleased(true);
+ }
+ }
+
+ private SlotManager createAndStartSlotManagerWithTM() {
+ SlotManager slotManager = SlotManagerBuilder
+ .newBuilder()
+ .setScheduledExecutor(mainThreadExecutor)
+ .setTaskManagerTimeout(Time.milliseconds(0L))
+ .build();
+ slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
+ mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
+ return slotManager;
+ }
+
+ private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+ SlotManager slotManager,
+ boolean canBeReleased) throws Exception {
+ checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManager, canBeReleased, () -> {});
+ }
+
+ private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(
+ SlotManager slotManager,
+ boolean canBeReleased,
+ RunnableWithException doAfterCheckTriggerBeforeCanBeReleasedResponse) throws Exception {
+ canBeReleasedFuture.set(new CompletableFuture<>());
+ mainThreadExecutor.execute(slotManager::checkTaskManagerTimeouts); // trigger TM.canBeReleased request
+ mainThreadExecutor.triggerAll();
+ doAfterCheckTriggerBeforeCanBeReleasedResponse.run();
+ canBeReleasedFuture.get().complete(canBeReleased); // finish TM.canBeReleased request
+ mainThreadExecutor.triggerAll();
+ }
+
+ private void verifyTmReleased(boolean isTmReleased) {
+ assertThat(releaseFuture.isDone(), is(isTmReleased));
+ if (isTmReleased) {
+ assertThat(releaseFuture.join(), is(equalTo(taskManagerConnection.getInstanceID())));
+ }
+ }
+}