You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/02/22 08:44:53 UTC

[flink] branch release-1.14 updated (1ff23ac -> 0a76d63)

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1ff23ac  [FLINK-25851][cassandra][tests] Inject dynamic table name into Pojos
     new 89046bc  [FLINK-24607] Let Deadline handle duration overflow.
     new 6936ce6  [FLINK-24607] Add util methods to shutdown executor services.
     new 0a76d63  [FLINK-24607] Make OperatorCoordinator closure more robust.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/api/common/time/Deadline.java |  27 +++-
 .../coordination/ComponentClosingUtils.java        |  95 +++++++++++-
 .../RecreateOnResetOperatorCoordinator.java        |  12 +-
 .../source/coordinator/ExecutorNotifier.java       |  21 +--
 .../source/coordinator/SourceCoordinator.java      |  23 +--
 .../coordinator/SourceCoordinatorContext.java      |  20 +--
 .../coordinator/SourceCoordinatorProvider.java     |  13 +-
 .../coordination/ComponentClosingUtilsTest.java    | 172 +++++++++++++++++++++
 .../source/coordinator/ExecutorNotifierTest.java   |  14 +-
 .../source/coordinator/SourceCoordinatorTest.java  |  62 +++++++-
 .../coordinator/SourceCoordinatorTestBase.java     |   9 +-
 .../ManuallyTriggeredScheduledExecutorService.java |   2 +-
 12 files changed, 382 insertions(+), 88 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java

[flink] 01/03: [FLINK-24607] Let Deadline handle duration overflow.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89046bc690d502e42212e01cfe28c737c0b2d3c9
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Feb 17 20:48:58 2022 +0800

    [FLINK-24607] Let Deadline handle duration overflow.
---
 .../org/apache/flink/api/common/time/Deadline.java | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
index 641a46b..bf7dba2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Deadline.java
@@ -42,7 +42,7 @@ public class Deadline {
     }
 
     public Deadline plus(Duration other) {
-        return new Deadline(Math.addExact(timeNanos, other.toNanos()), this.clock);
+        return new Deadline(addHandlingOverflow(timeNanos, other.toNanos()), this.clock);
     }
 
     /**
@@ -72,9 +72,12 @@ public class Deadline {
         return !isOverdue();
     }
 
-    /** Determines whether the deadline is in the past, i.e. whether the time left is negative. */
+    /**
+     * Determines whether the deadline is in the past, i.e. whether the time left is zero or
+     * negative.
+     */
     public boolean isOverdue() {
-        return timeNanos < clock.relativeTimeNanos();
+        return timeNanos <= clock.relativeTimeNanos();
     }
 
     // ------------------------------------------------------------------------
@@ -92,7 +95,8 @@ public class Deadline {
     /** Constructs a Deadline that is a given duration after now. */
     public static Deadline fromNow(Duration duration) {
         return new Deadline(
-                Math.addExact(System.nanoTime(), duration.toNanos()), SystemClock.getInstance());
+                addHandlingOverflow(System.nanoTime(), duration.toNanos()),
+                SystemClock.getInstance());
     }
 
     /**
@@ -103,11 +107,24 @@ public class Deadline {
      * @param clock Time provider for this deadline.
      */
     public static Deadline fromNowWithClock(Duration duration, Clock clock) {
-        return new Deadline(Math.addExact(clock.relativeTimeNanos(), duration.toNanos()), clock);
+        return new Deadline(
+                addHandlingOverflow(clock.relativeTimeNanos(), duration.toNanos()), clock);
     }
 
     @Override
     public String toString() {
         return LocalDateTime.now().plus(timeLeft()).toString();
     }
+
+    // -------------------- private helper methods ----------------
+
+    private static long addHandlingOverflow(long x, long y) {
+        // The logic is copied over from Math.addExact() in order to handle overflows.
+        long r = x + y;
+        if (((x ^ r) & (y ^ r)) < 0) {
+            return Long.MAX_VALUE;
+        } else {
+            return x + y;
+        }
+    }
 }

[flink] 02/03: [FLINK-24607] Add util methods to shutdown executor services.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6936ce6848e0544cb3275aa841ce40cac308540d
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Fri Feb 11 16:13:50 2022 +0800

    [FLINK-24607] Add util methods to shutdown executor services.
---
 .../coordination/ComponentClosingUtils.java        |  95 ++++++++++-
 .../coordination/ComponentClosingUtilsTest.java    | 173 +++++++++++++++++++++
 .../ManuallyTriggeredScheduledExecutorService.java |   2 +-
 3 files changed, 266 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index deed49e..4bfe302 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -18,16 +18,22 @@ limitations under the License.
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** A util class to help with a clean component shutdown. */
 public class ComponentClosingUtils {
+    private static Clock clock = SystemClock.getInstance();
 
     /** Utility class, not meant to be instantiated. */
     private ComponentClosingUtils() {}
@@ -95,8 +101,91 @@ public class ComponentClosingUtils {
         return future;
     }
 
-    static void abortThread(Thread t) {
-        // the abortion strategy is pretty simple here...
-        t.interrupt();
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method can be interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) {
+        Deadline deadline = Deadline.fromNowWithClock(timeout, clock);
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                executor.awaitTermination(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+        } while (!isInterrupted && deadline.hasTimeLeft() && !executor.isTerminated());
+        return executor.isTerminated();
+    }
+
+    private static void abortThread(Thread t) {
+        // Try our best here to ensure the thread is aborted. Keep interrupting the
+        // thread for 10 times with 10 ms intervals. This helps handle the case
+        // where the shutdown sequence consists of a bunch of closeQuietly() calls
+        // that will swallow the InterruptedException so the thread to be aborted
+        // may block multiple times. If the thread is still alive after all the
+        // attempts, just let it go. The caller of closeAsyncWithTimeout() should
+        // have received a TimeoutException in this case.
+        int i = 0;
+        while (t.isAlive() && i < 10) {
+            t.interrupt();
+            i++;
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // Let it go.
+            }
+        }
+    }
+
+    // ========= Method visible for testing ========
+
+    @VisibleForTesting
+    static void setClock(Clock clock) {
+        ComponentClosingUtils.clock = clock;
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
new file mode 100644
index 0000000..d43089d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** The unit test class for {@link ComponentClosingUtils}. */
+public class ComponentClosingUtilsTest {
+    private ManualClock clock;
+
+    @Before
+    public void setup() {
+        clock = new ManualClock();
+        ComponentClosingUtils.setClock(clock);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyWithoutForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(0);
+        assertTrue(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(0, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyTimeoutWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.timeoutAfterNumForcefulShutdown(clock, 0);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testTryShutdownExecutorElegantlyInterruptedWithForcefulShutdown() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(0);
+        assertFalse(
+                ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1)));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefully() {
+        MockExecutorService executor = new MockExecutorService(5);
+        assertTrue(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(5, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyReachesTimeout() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.timeoutAfterNumForcefulShutdown(clock, 1);
+        assertFalse(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyNotInterruptable() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(1);
+        assertTrue(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), false));
+        assertEquals(5, executor.forcefullyShutdownCount);
+    }
+
+    @Test
+    public void testShutdownExecutorForcefullyInterruptable() {
+        MockExecutorService executor = new MockExecutorService(5);
+        executor.interruptAfterNumForcefulShutdown(1);
+        assertFalse(
+                ComponentClosingUtils.shutdownExecutorForcefully(
+                        executor, Duration.ofDays(1), true));
+        assertEquals(1, executor.forcefullyShutdownCount);
+    }
+
+    // ============== private class for testing ===============
+
+    /** An executor class that behaves in an orchestrated way. */
+    private static final class MockExecutorService
+            extends ManuallyTriggeredScheduledExecutorService {
+        private final int numRequiredForcefullyShutdown;
+        private ManualClock clock;
+        private int forcefullyShutdownCount;
+        private int interruptAfterNumForcefulShutdown = Integer.MAX_VALUE;
+        private int timeoutAfterNumForcefulShutdown = Integer.MAX_VALUE;
+
+        private MockExecutorService(int numRequiredForcefullyShutdown) {
+            this.numRequiredForcefullyShutdown = numRequiredForcefullyShutdown;
+            forcefullyShutdownCount = 0;
+        }
+
+        @Override
+        public @NotNull List<Runnable> shutdownNow() {
+            forcefullyShutdownCount++;
+            return super.shutdownNow();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            if (forcefullyShutdownCount < numRequiredForcefullyShutdown) {
+                if (forcefullyShutdownCount >= timeoutAfterNumForcefulShutdown) {
+                    clock.advanceTime(Duration.ofDays(100));
+                }
+                if (forcefullyShutdownCount >= interruptAfterNumForcefulShutdown) {
+                    throw new InterruptedException();
+                }
+            }
+            return super.awaitTermination(timeout, unit) && reachedForcefulShutdownCount();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return super.isTerminated() && reachedForcefulShutdownCount();
+        }
+
+        public void interruptAfterNumForcefulShutdown(int interruptAfterNumForcefulShutdown) {
+            this.interruptAfterNumForcefulShutdown = interruptAfterNumForcefulShutdown;
+        }
+
+        public void timeoutAfterNumForcefulShutdown(
+                ManualClock clock, int timeoutAfterNumForcefulShutdown) {
+            this.clock = clock;
+            this.timeoutAfterNumForcefulShutdown = timeoutAfterNumForcefulShutdown;
+        }
+
+        private boolean reachedForcefulShutdownCount() {
+            return forcefullyShutdownCount >= numRequiredForcefullyShutdown;
+        }
+    }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 15c1f7b..0ae1d28 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -113,7 +113,7 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu
     }
 
     @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) {
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
         return true;
     }
 

[flink] 03/03: [FLINK-24607] Make OperatorCoordinator closure more robust.

Posted by jq...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a76d632f33d9a69df87457a63043bd7f609ed40
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Feb 21 17:43:19 2022 +0800

    [FLINK-24607] Make OperatorCoordinator closure more robust.
---
 .../RecreateOnResetOperatorCoordinator.java        | 12 ++++-
 .../source/coordinator/ExecutorNotifier.java       | 21 +-------
 .../source/coordinator/SourceCoordinator.java      | 23 ++------
 .../coordinator/SourceCoordinatorContext.java      | 20 +++----
 .../coordinator/SourceCoordinatorProvider.java     | 13 +----
 .../coordination/ComponentClosingUtilsTest.java    |  3 +-
 .../source/coordinator/ExecutorNotifierTest.java   | 14 ++---
 .../source/coordinator/SourceCoordinatorTest.java  | 62 ++++++++++++++++++++--
 .../coordinator/SourceCoordinatorTestBase.java     |  9 ++--
 9 files changed, 96 insertions(+), 81 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 0d3d5f8..7d3d3ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -128,8 +128,16 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
         // capture the status whether the coordinator was started when this method was called
         final boolean wasStarted = this.started;
 
-        closingFuture.thenRun(
-                () -> {
+        closingFuture.whenComplete(
+                (ignored, e) -> {
+                    if (e != null) {
+                        LOG.warn(
+                                String.format(
+                                        "Received exception when closing "
+                                                + "operator coordinator for %s.",
+                                        oldCoordinator.operatorId),
+                                e);
+                    }
                     if (!closed) {
                         // The previous coordinator has closed. Create a new one.
                         newCoordinator.createNewInternalCoordinator(context, provider);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index e52f6cd..fe4cf8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -25,23 +25,20 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 /**
  * This class is used to coordinate between two components, where one component has an executor
  * following the mailbox model and the other component notifies it when needed.
  */
-public class ExecutorNotifier implements AutoCloseable {
+public class ExecutorNotifier {
     private static final Logger LOG = LoggerFactory.getLogger(ExecutorNotifier.class);
     private final ScheduledExecutorService workerExecutor;
     private final Executor executorToNotify;
-    private final AtomicBoolean closed;
 
     public ExecutorNotifier(ScheduledExecutorService workerExecutor, Executor executorToNotify) {
         this.executorToNotify = executorToNotify;
         this.workerExecutor = workerExecutor;
-        this.closed = new AtomicBoolean(false);
     }
 
     /**
@@ -140,20 +137,4 @@ public class ExecutorNotifier implements AutoCloseable {
                 periodMs,
                 TimeUnit.MILLISECONDS);
     }
-
-    /**
-     * Close the executor notifier. This is a blocking call which waits for all the async calls to
-     * finish before it returns.
-     *
-     * @throws InterruptedException when interrupted during closure.
-     */
-    public void close() throws InterruptedException {
-        if (!closed.compareAndSet(false, true)) {
-            LOG.debug("The executor notifier has been closed.");
-            return;
-        }
-        // Shutdown the worker executor, so no more worker tasks can run.
-        workerExecutor.shutdownNow();
-        workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 85a767e..484c9ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -51,12 +51,11 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
 import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -80,8 +79,6 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
 
     /** The name of the operator this SourceCoordinator is associated with. */
     private final String operatorName;
-    /** A single-thread executor to handle all the changes to the coordinator. */
-    private final ExecutorService coordinatorExecutor;
     /** The Source that is associated with this SourceCoordinator. */
     private final Source<?, SplitT, EnumChkT> source;
     /** The serializer that handles the serde of the SplitEnumerator checkpoints. */
@@ -98,11 +95,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
 
     public SourceCoordinator(
             String operatorName,
-            ExecutorService coordinatorExecutor,
             Source<?, SplitT, EnumChkT> source,
             SourceCoordinatorContext<SplitT> context) {
         this.operatorName = operatorName;
-        this.coordinatorExecutor = coordinatorExecutor;
         this.source = source;
         this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
         this.context = context;
@@ -144,18 +139,8 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
     @Override
     public void close() throws Exception {
         LOG.info("Closing SourceCoordinator for source {}.", operatorName);
-        try {
-            if (started) {
-                context.close();
-                if (enumerator != null) {
-                    enumerator.close();
-                }
-            }
-        } finally {
-            coordinatorExecutor.shutdownNow();
-            // We do not expect this to actually block for long. At this point, there should
-            // be very few task running in the executor, if any.
-            coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        if (started) {
+            closeAll(context, enumerator);
         }
         LOG.info("Source coordinator for source {} closed.", operatorName);
     }
@@ -338,7 +323,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
             return;
         }
 
-        coordinatorExecutor.execute(
+        context.runInCoordinatorThread(
                 () -> {
                     try {
                         action.run();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 0f0def9..5d276ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -50,12 +51,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
+
 /**
  * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext}
  * this class allows interaction with state and sending {@link OperatorEvent} to the SourceOperator
@@ -82,7 +83,8 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
     private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinatorContext.class);
 
-    private final ExecutorService coordinatorExecutor;
+    private final ScheduledExecutorService workerExecutor;
+    private final ScheduledExecutorService coordinatorExecutor;
     private final ExecutorNotifier notifier;
     private final OperatorCoordinator.Context operatorCoordinatorContext;
     private final SimpleVersionedSerializer<SplitT> splitSerializer;
@@ -95,13 +97,12 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     private volatile boolean closed;
 
     public SourceCoordinatorContext(
-            ExecutorService coordinatorExecutor,
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
             int numWorkerThreads,
             OperatorCoordinator.Context operatorCoordinatorContext,
             SimpleVersionedSerializer<SplitT> splitSerializer) {
         this(
-                coordinatorExecutor,
+                Executors.newScheduledThreadPool(1, coordinatorThreadFactory),
                 Executors.newScheduledThreadPool(
                         numWorkerThreads,
                         new ExecutorThreadFactory(
@@ -115,12 +116,13 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     // Package private method for unit test.
     @VisibleForTesting
     SourceCoordinatorContext(
-            ExecutorService coordinatorExecutor,
+            ScheduledExecutorService coordinatorExecutor,
             ScheduledExecutorService workerExecutor,
             SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory,
             OperatorCoordinator.Context operatorCoordinatorContext,
             SimpleVersionedSerializer<SplitT> splitSerializer,
             SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+        this.workerExecutor = workerExecutor;
         this.coordinatorExecutor = coordinatorExecutor;
         this.coordinatorThreadFactory = coordinatorThreadFactory;
         this.operatorCoordinatorContext = operatorCoordinatorContext;
@@ -246,9 +248,9 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
     @Override
     public void close() throws InterruptedException {
         closed = true;
-        notifier.close();
-        coordinatorExecutor.shutdown();
-        coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        // Close quietly so the closing sequence will be executed completely.
+        shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+        shutdownExecutorForcefully(coordinatorExecutor, Duration.ofNanos(Long.MAX_VALUE));
     }
 
     // --------- Package private additional methods for the SourceCoordinator ------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 9ffb984..e0ee8b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -30,8 +30,6 @@ import org.apache.flink.util.FatalExitExceptionHandler;
 import javax.annotation.Nullable;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.BiConsumer;
 
@@ -71,19 +69,12 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit>
         CoordinatorExecutorThreadFactory coordinatorThreadFactory =
                 new CoordinatorExecutorThreadFactory(
                         coordinatorThreadName, context.getUserCodeClassloader());
-        ExecutorService coordinatorExecutor =
-                Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 
         SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
         SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
                 new SourceCoordinatorContext<>(
-                        coordinatorExecutor,
-                        coordinatorThreadFactory,
-                        numWorkerThreads,
-                        context,
-                        splitSerializer);
-        return new SourceCoordinator<>(
-                operatorName, coordinatorExecutor, source, sourceCoordinatorContext);
+                        coordinatorThreadFactory, numWorkerThreads, context, splitSerializer);
+        return new SourceCoordinator<>(operatorName, source, sourceCoordinatorContext);
     }
 
     /** A thread factory class that provides some helper methods. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
index d43089d..7de4b07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.util.clock.ManualClock;
 
-import org.jetbrains.annotations.NotNull;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -133,7 +132,7 @@ public class ComponentClosingUtilsTest {
         }
 
         @Override
-        public @NotNull List<Runnable> shutdownNow() {
+        public List<Runnable> shutdownNow() {
             forcefullyShutdownCount++;
             return super.shutdownNow();
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index 8f7a806..76a1b54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -22,6 +22,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.shutdownExecutorForcefully;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -62,8 +64,8 @@ public class ExecutorNotifierTest {
 
     @After
     public void tearDown() throws InterruptedException {
-        notifier.close();
-        closeExecutorToNotify();
+        shutdownExecutorForcefully(workerExecutor, Duration.ofNanos(Long.MAX_VALUE));
+        shutdownExecutorForcefully(executorToNotify, Duration.ofNanos(Long.MAX_VALUE));
     }
 
     @Test
@@ -77,7 +79,6 @@ public class ExecutorNotifierTest {
                     latch.countDown();
                 });
         latch.await();
-        closeExecutorToNotify();
         assertEquals(1234, result.get());
     }
 
@@ -110,7 +111,6 @@ public class ExecutorNotifierTest {
                     throw exception2;
                 });
         latch.await();
-        closeExecutorToNotify();
         // The uncaught exception handler may fire after the executor has shutdown.
         // We need to wait on the countdown latch here.
         exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
@@ -128,15 +128,9 @@ public class ExecutorNotifierTest {
                     throw exception;
                 });
         latch.await();
-        closeExecutorToNotify();
         // The uncaught exception handler may fire after the executor has shutdown.
         // We need to wait on the countdown latch here.
         exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
         assertEquals(exception, exceptionInHandler);
     }
-
-    private void closeExecutorToNotify() throws InterruptedException {
-        executorToNotify.shutdown();
-        executorToNotify.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index f01143e..95314a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -33,9 +33,11 @@ import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.ComponentClosingUtils;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.Test;
 
@@ -51,6 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
@@ -242,7 +246,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                 final SourceCoordinator<?, ?> coordinator =
                         new SourceCoordinator<>(
                                 OPERATOR_NAME,
-                                coordinatorExecutor,
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context)) {
 
@@ -262,7 +265,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
         final SourceCoordinator<?, ?> coordinator =
                 new SourceCoordinator<>(
                         OPERATOR_NAME,
-                        coordinatorExecutor,
                         new EnumeratorCreatingSource<>(
                                 () -> {
                                     throw failureReason;
@@ -290,7 +292,6 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
                 final SourceCoordinator<?, ?> coordinator =
                         new SourceCoordinator<>(
                                 OPERATOR_NAME,
-                                coordinatorExecutor,
                                 new EnumeratorCreatingSource<>(() -> splitEnumerator),
                                 context)) {
 
@@ -306,6 +307,61 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
     }
 
     @Test
+    public void testBlockOnClose() throws Exception {
+        // It is possible that the split enumerator submits some heavy-duty work to the
+        // coordinator executor which blocks the coordinator closure.
+        final CountDownLatch latch = new CountDownLatch(1);
+        try (final MockSplitEnumeratorContext<MockSourceSplit> enumeratorContext =
+                        new MockSplitEnumeratorContext<>(1);
+                final MockSplitEnumerator splitEnumerator =
+                        new MockSplitEnumerator(1, enumeratorContext) {
+                            @Override
+                            public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+                                context.callAsync(
+                                        () -> 1L,
+                                        (ignored, t) -> {
+                                            latch.countDown();
+                                            // Submit a callable that will never return.
+                                            try {
+                                                Thread.sleep(Long.MAX_VALUE);
+                                            } catch (InterruptedException e) {
+                                                throw new RuntimeException(e);
+                                            }
+                                        });
+                            }
+                        };
+                final SourceCoordinator<?, ?> coordinator =
+                        new SourceCoordinator<>(
+                                OPERATOR_NAME,
+                                new EnumeratorCreatingSource<>(() -> splitEnumerator),
+                                context)) {
+
+            coordinator.start();
+            coordinator.handleEventFromOperator(1, new SourceEventWrapper(new SourceEvent() {}));
+            // Wait until the coordinator executor blocks.
+            latch.await();
+
+            CompletableFuture<?> future =
+                    ComponentClosingUtils.closeAsyncWithTimeout(
+                            "testBlockOnClose",
+                            (ThrowingRunnable<Exception>) coordinator::close,
+                            Duration.ofMillis(1));
+
+            future.exceptionally(
+                            e -> {
+                                assertTrue(e instanceof TimeoutException);
+                                return null;
+                            })
+                    .get();
+
+            waitUtil(
+                    splitEnumerator::closed,
+                    Duration.ofSeconds(5),
+                    "Split enumerator was not closed in 5 seconds.");
+        }
+    }
+
+    @Test
     public void testUserClassLoaderWhenCreatingNewEnumerator() throws Exception {
         final ClassLoader testClassLoader = new URLClassLoader(new URL[0]);
         final OperatorCoordinator.Context context =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 6ddd633..32f5d24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -37,8 +37,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -57,7 +57,7 @@ public abstract class SourceCoordinatorTestBase {
 
     // ---- Mocks for the Source Coordinator Context ----
     protected SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory;
-    protected ExecutorService coordinatorExecutor;
+    protected ScheduledExecutorService coordinatorExecutor;
     protected SplitAssignmentTracker<MockSourceSplit> splitSplitAssignmentTracker;
     protected SourceCoordinatorContext<MockSourceSplit> context;
 
@@ -78,7 +78,7 @@ public abstract class SourceCoordinatorTestBase {
                 new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
                         coordinatorThreadName, getClass().getClassLoader());
 
-        coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        coordinatorExecutor = Executors.newSingleThreadScheduledExecutor(coordinatorThreadFactory);
         sourceCoordinator = getNewSourceCoordinator();
         context = sourceCoordinator.getContext();
     }
@@ -152,8 +152,7 @@ public abstract class SourceCoordinatorTestBase {
                         new MockSourceSplitSerializer(),
                         new MockSplitEnumeratorCheckpointSerializer());
 
-        return new SourceCoordinator<>(
-                OPERATOR_NAME, coordinatorExecutor, mockSource, getNewSourceCoordinatorContext());
+        return new SourceCoordinator<>(OPERATOR_NAME, mockSource, getNewSourceCoordinatorContext());
     }
 
     protected SourceCoordinatorContext<MockSourceSplit> getNewSourceCoordinatorContext() {