You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/02/21 07:28:41 UTC

[flink] 02/03: [FLINK-24607] Add util methods to shutdown executor services.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a17655ebe7e3b2870b7616f1c2b640fcb3154187
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Fri Feb 11 16:13:50 2022 +0800

    [FLINK-24607] Add util methods to shutdown executor services.
---
 .../coordination/ComponentClosingUtils.java        |  95 ++++++++++-
 .../coordination/ComponentClosingUtilsTest.java    | 173 +++++++++++++++++++++
 .../ManuallyTriggeredScheduledExecutorService.java |   2 +-
 3 files changed, 266 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index deed49e..4bfe302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -18,16 +18,22 @@ limitations under the License.
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** A util class to help with a clean component shutdown. */
 public class ComponentClosingUtils {
+    private static Clock clock = SystemClock.getInstance();
 
     /** Utility class, not meant to be instantiated. */
     private ComponentClosingUtils() {}
@@ -95,8 +101,91 @@ public class ComponentClosingUtils {
         return future;
     }
 
-    static void abortThread(Thread t) {
-        // the abortion strategy is pretty simple here...
-        t.interrupt();
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method can be interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) {
+        Deadline deadline = Deadline.fromNowWithClock(timeout, clock);
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                executor.awaitTermination(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+        } while (!isInterrupted && deadline.hasTimeLeft() && !executor.isTerminated());
+        return executor.isTerminated();
+    }
+
+    private static void abortThread(Thread t) {
+        // Try our best here to ensure the thread is aborted. Keep interrupting the
+        // thread for 10 times with 10 ms intervals. This helps handle the case
+        // where the shutdown sequence consists of a bunch of closeQuietly() calls
+        // that will swallow the InterruptedException so the thread to be aborted
+        // may block multiple times. If the thread is still alive after all the
+        // attempts, just let it go. The caller of closeAsyncWithTimeout() should
+        // have received a TimeoutException in this case.
+        int i = 0;
+        while (t.isAlive() && i < 10) {
+            t.interrupt();
+            i++;
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // Let it go.
+            }
+        }
+    }
+
+    // ========= Method visible for testing ========
+
+    @VisibleForTesting
+    static void setClock(Clock clock) {
+        ComponentClosingUtils.clock = clock;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
new file mode 100644
index 0000000..d43089d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** The unit test class for {@link ComponentClosingUtils}. */
+public class ComponentClosingUtilsTest {
+    private ManualClock clock;
+
+    @Before
+    public void setup() {
+        clock = new ManualClock();
+        ComponentClosingUtils.setClock(clock);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyWithoutForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(0);
+        assertTrue(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(0, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyTimeoutWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.timeoutAfterNumForcefulShutdown(clock, 0);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyInterruptedWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(0);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefully() {
+        MockExecutorService executor = new MockExecutorService(5);
+        assertTrue(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(5, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyReachesTimeout() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.timeoutAfterNumForcefulShutdown(clock, 1);
+        assertFalse(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyNotInterruptable() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(1);
+        assertTrue(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(5, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyInterruptable() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(1);
+        assertFalse(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), true));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    // ============== private class for testing ===============
+
+    /** An executor class that behaves in an orchestrated way. */
+    private static final class MockExecutorService
+            extends ManuallyTriggeredScheduledExecutorService {
+        private final int numRequiredForcefullyShutdown;
+        private ManualClock clock;
+        private int forcefullyShutdownCount;
+        private int interruptAfterNumForcefulShutdown = Integer.MAX_VALUE;
+        private int timeoutAfterNumForcefulShutdown = Integer.MAX_VALUE;
+
+        private MockExecutorService(int numRequiredForcefullyShutdown) {
+            this.numRequiredForcefullyShutdown = numRequiredForcefullyShutdown;
+            forcefullyShutdownCount = 0;
+        }
+
+        @Override
+        public @NotNull List<Runnable> shutdownNow() {
+            forcefullyShutdownCount++;
+            return super.shutdownNow();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            if (forcefullyShutdownCount < numRequiredForcefullyShutdown) {
+                if (forcefullyShutdownCount >= timeoutAfterNumForcefulShutdown) {
+                    clock.advanceTime(Duration.ofDays(100));
+                }
+                if (forcefullyShutdownCount >= interruptAfterNumForcefulShutdown) {
+                    throw new InterruptedException();
+                }
+            }
+            return super.awaitTermination(timeout, unit) && reachedForcefulShutdownCount();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return super.isTerminated() && reachedForcefulShutdownCount();
+        }
+
+        public void interruptAfterNumForcefulShutdown(int interruptAfterNumForcefulShutdown) {
+            this.interruptAfterNumForcefulShutdown = interruptAfterNumForcefulShutdown;
+        }
+
+        public void timeoutAfterNumForcefulShutdown(
+                ManualClock clock, int timeoutAfterNumForcefulShutdown) {
+            this.clock = clock;
+            this.timeoutAfterNumForcefulShutdown = timeoutAfterNumForcefulShutdown;
+        }
+
+        private boolean reachedForcefulShutdownCount() {
+            return forcefullyShutdownCount >= numRequiredForcefullyShutdown;
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 15c1f7b..0ae1d28 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -113,7 +113,7 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu
     }
 
     @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) {
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
         return true;
     }