You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/02/21 07:28:41 UTC
[flink] 02/03: [FLINK-24607] Add util methods to shutdown executor services.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a17655ebe7e3b2870b7616f1c2b640fcb3154187
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Fri Feb 11 16:13:50 2022 +0800
[FLINK-24607] Add util methods to shutdown executor services.
---
.../coordination/ComponentClosingUtils.java | 95 ++++++++++-
.../coordination/ComponentClosingUtilsTest.java | 173 +++++++++++++++++++++
.../ManuallyTriggeredScheduledExecutorService.java | 2 +-
3 files changed, 266 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index deed49e..4bfe302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -18,16 +18,22 @@ limitations under the License.
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** A util class to help with a clean component shutdown. */
public class ComponentClosingUtils {
+ private static Clock clock = SystemClock.getInstance();
/** Utility class, not meant to be instantiated. */
private ComponentClosingUtils() {}
@@ -95,8 +101,91 @@ public class ComponentClosingUtils {
return future;
}
- static void abortThread(Thread t) {
- // the abortion strategy is pretty simple here...
- t.interrupt();
+ /**
+ * A util method that tries to shut down an {@link ExecutorService} elegantly within the given
+ * timeout. If the executor has not been shut down before it hits timeout or the thread is
+ * interrupted when waiting for the termination, a forceful shutdown will be attempted on the
+ * executor.
+ *
+ * @param executor the {@link ExecutorService} to shut down.
+ * @param timeout the timeout duration.
+ * @return true if the given executor has been successfully closed, false otherwise.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ie) {
+ // Let it go.
+ }
+ if (!executor.isTerminated()) {
+ shutdownExecutorForcefully(executor, Duration.ZERO, false);
+ }
+ return executor.isTerminated();
+ }
+
+ /**
+ * Shutdown the given executor forcefully within the given timeout. The method returns if it is
+ * interrupted.
+ *
+ * @param executor the executor to shut down.
+ * @param timeout the timeout duration.
+ * @return true if the given executor is terminated, false otherwise.
+ */
+ public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) {
+ return shutdownExecutorForcefully(executor, timeout, true);
+ }
+
+ /**
+ * Shutdown the given executor forcefully within the given timeout.
+ *
+ * @param executor the executor to shut down.
+ * @param timeout the timeout duration.
+ * @param interruptable when set to true, the method can be interrupted. Each interruption to
+ * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting
+ * down executor.
+ * @return true if the given executor is terminated, false otherwise.
+ */
+ public static boolean shutdownExecutorForcefully(
+ ExecutorService executor, Duration timeout, boolean interruptable) {
+ Deadline deadline = Deadline.fromNowWithClock(timeout, clock);
+ boolean isInterrupted = false;
+ do {
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ isInterrupted = interruptable;
+ }
+ } while (!isInterrupted && deadline.hasTimeLeft() && !executor.isTerminated());
+ return executor.isTerminated();
+ }
+
+ private static void abortThread(Thread t) {
+ // Try our best here to ensure the thread is aborted. Keep interrupting the
+ // thread for 10 times with 10 ms intervals. This helps handle the case
+ // where the shutdown sequence consists of a bunch of closeQuietly() calls
+ // that will swallow the InterruptedException so the thread to be aborted
+ // may block multiple times. If the thread is still alive after all the
+ // attempts, just let it go. The caller of closeAsyncWithTimeout() should
+ // have received a TimeoutException in this case.
+ int i = 0;
+ while (t.isAlive() && i < 10) {
+ t.interrupt();
+ i++;
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // Let it go.
+ }
+ }
+ }
+
+ // ========= Method visible for testing ========
+
+ @VisibleForTesting
+ static void setClock(Clock clock) {
+ ComponentClosingUtils.clock = clock;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
new file mode 100644
index 0000000..d43089d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** The unit test class for {@link ComponentClosingUtils}. */
+public class ComponentClosingUtilsTest {
+ private ManualClock clock;
+
+ @Before
+ public void setup() {
+ clock = new ManualClock();
+ ComponentClosingUtils.setClock(clock);
+ }
+
+ @Test
+ public void testTryShutdownExecutorElegantlyWithoutForcefulShutdown() {
+ MockExecutorService executor = new MockExecutorService(0);
+ assertTrue(
+ ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+ assertEquals(0, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testTryShutdownExecutorElegantlyWithForcefulShutdown() {
+ MockExecutorService executor = new MockExecutorService(5);
+ assertFalse(
+ ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+ assertEquals(1, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testTryShutdownExecutorElegantlyTimeoutWithForcefulShutdown() {
+ MockExecutorService executor = new MockExecutorService(5);
+ executor.timeoutAfterNumForcefulShutdown(clock, 0);
+ assertFalse(
+ ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+ assertEquals(1, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testTryShutdownExecutorElegantlyInterruptedWithForcefulShutdown() {
+ MockExecutorService executor = new MockExecutorService(5);
+ executor.interruptAfterNumForcefulShutdown(0);
+ assertFalse(
+ ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+ assertEquals(1, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testShutdownExecutorForcefully() {
+ MockExecutorService executor = new MockExecutorService(5);
+ assertTrue(
+ ComponentClosingUtils.shutdownExecutorForcefully(
+ executor, Duration.ofDays(1), false));
+ assertEquals(5, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testShutdownExecutorForcefullyReachesTimeout() {
+ MockExecutorService executor = new MockExecutorService(5);
+ executor.timeoutAfterNumForcefulShutdown(clock, 1);
+ assertFalse(
+ ComponentClosingUtils.shutdownExecutorForcefully(
+ executor, Duration.ofDays(1), false));
+ assertEquals(1, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testShutdownExecutorForcefullyNotInterruptable() {
+ MockExecutorService executor = new MockExecutorService(5);
+ executor.interruptAfterNumForcefulShutdown(1);
+ assertTrue(
+ ComponentClosingUtils.shutdownExecutorForcefully(
+ executor, Duration.ofDays(1), false));
+ assertEquals(5, executor.forcefullyShutdownCount);
+ }
+
+ @Test
+ public void testShutdownExecutorForcefullyInterruptable() {
+ MockExecutorService executor = new MockExecutorService(5);
+ executor.interruptAfterNumForcefulShutdown(1);
+ assertFalse(
+ ComponentClosingUtils.shutdownExecutorForcefully(
+ executor, Duration.ofDays(1), true));
+ assertEquals(1, executor.forcefullyShutdownCount);
+ }
+
+ // ============== private class for testing ===============
+
+ /** An executor class that behaves in an orchestrated way. */
+ private static final class MockExecutorService
+ extends ManuallyTriggeredScheduledExecutorService {
+ private final int numRequiredForcefullyShutdown;
+ private ManualClock clock;
+ private int forcefullyShutdownCount;
+ private int interruptAfterNumForcefulShutdown = Integer.MAX_VALUE;
+ private int timeoutAfterNumForcefulShutdown = Integer.MAX_VALUE;
+
+ private MockExecutorService(int numRequiredForcefullyShutdown) {
+ this.numRequiredForcefullyShutdown = numRequiredForcefullyShutdown;
+ forcefullyShutdownCount = 0;
+ }
+
+ @Override
+ public @NotNull List<Runnable> shutdownNow() {
+ forcefullyShutdownCount++;
+ return super.shutdownNow();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ if (forcefullyShutdownCount < numRequiredForcefullyShutdown) {
+ if (forcefullyShutdownCount >= timeoutAfterNumForcefulShutdown) {
+ clock.advanceTime(Duration.ofDays(100));
+ }
+ if (forcefullyShutdownCount >= interruptAfterNumForcefulShutdown) {
+ throw new InterruptedException();
+ }
+ }
+ return super.awaitTermination(timeout, unit) && reachedForcefulShutdownCount();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return super.isTerminated() && reachedForcefulShutdownCount();
+ }
+
+ public void interruptAfterNumForcefulShutdown(int interruptAfterNumForcefulShutdown) {
+ this.interruptAfterNumForcefulShutdown = interruptAfterNumForcefulShutdown;
+ }
+
+ public void timeoutAfterNumForcefulShutdown(
+ ManualClock clock, int timeoutAfterNumForcefulShutdown) {
+ this.clock = clock;
+ this.timeoutAfterNumForcefulShutdown = timeoutAfterNumForcefulShutdown;
+ }
+
+ private boolean reachedForcefulShutdownCount() {
+ return forcefullyShutdownCount >= numRequiredForcefullyShutdown;
+ }
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 15c1f7b..0ae1d28 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -113,7 +113,7 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu
}
@Override
- public boolean awaitTermination(long timeout, TimeUnit unit) {
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return true;
}