You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/12 11:08:14 UTC
[flink] 11/32: [FLINK-20266][runtime] Replace dedicated thread pool
in ComponentClosingUtils with use of 'FutureUtils.orTimeout()'
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6206cdd88ad24731c470cdd779d310b1c2cbffc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Nov 22 15:28:29 2020 +0100
[FLINK-20266][runtime] Replace dedicated thread pool in ComponentClosingUtils with use of 'FutureUtils.orTimeout()'
This removes extra (non-daemon) threads, which were previously keeping the threads alive.
---
.../coordination}/ComponentClosingUtils.java | 51 ++++++++++------------
.../RecreateOnResetOperatorCoordinator.java | 2 +-
2 files changed, 23 insertions(+), 30 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
similarity index 71%
rename from flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index 65c83ae..581204a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -16,17 +16,12 @@
limitations under the License.
*/
-package org.apache.flink.util;
+package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,18 +29,8 @@ import java.util.concurrent.TimeoutException;
* A util class to help with a clean component shutdown.
*/
public class ComponentClosingUtils {
- private static final Logger LOG = LoggerFactory.getLogger(ComponentClosingUtils.class);
- // A shared watchdog executor to handle the timeout closing.
- private static final ScheduledExecutorService watchDog =
- Executors.newSingleThreadScheduledExecutor((ThreadFactory) r -> {
- Thread t = new Thread(r, "ComponentClosingUtil");
- t.setUncaughtExceptionHandler((thread, exception) -> {
- LOG.error("FATAL: The component closing util thread caught exception ", exception);
- System.exit(-17);
- });
- return t;
- });
+ /** Utility class, not meant to be instantiated. */
private ComponentClosingUtils() {}
/**
@@ -85,26 +70,34 @@ public class ComponentClosingUtils {
long closeTimeoutMs) {
final CompletableFuture<Void> future = new CompletableFuture<>();
// Start a dedicate thread to close the component.
- Thread t = new Thread(() -> {
+ final Thread t = new Thread(() -> {
closingSequence.run();
future.complete(null);
});
// Use uncaught exception handler to handle exceptions during closing.
t.setUncaughtExceptionHandler((thread, error) -> future.completeExceptionally(error));
t.start();
- // Schedule a watch dog job to the watching executor to detect timeout when
- // closing the component.
- watchDog.schedule(() -> {
- if (t.isAlive()) {
- t.interrupt();
- future.completeExceptionally(new TimeoutException(
- String.format("Failed to close the %s before timeout of %d milliseconds",
- componentName, closeTimeoutMs)));
- }
- }, closeTimeoutMs, TimeUnit.MILLISECONDS);
+
+ // if the future fails due to a timeout, we interrupt the thread
+ future.exceptionally((error) -> {
+ if (error instanceof TimeoutException && t.isAlive()) {
+ abortThread(t);
+ }
+ return null;
+ });
+
+ FutureUtils.orTimeout(
+ future,
+ closeTimeoutMs, TimeUnit.MILLISECONDS);
+
return future;
}
+ static void abortThread(Thread t) {
+ // the abortion strategy is pretty simple here...
+ t.interrupt();
+ }
+
// ---------------------------
private static class ClosingException extends RuntimeException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index af19225..c2e9cf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -33,7 +33,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout;
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.closeAsyncWithTimeout;
/**
* A class that will recreate a new {@link OperatorCoordinator} instance when