You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/12 11:08:14 UTC

[flink] 11/32: [FLINK-20266][runtime] Replace dedicated thread pool in ComponentClosingUtils with use of 'FutureUtils.orTimeout()'

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6206cdd88ad24731c470cdd779d310b1c2cbffc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Nov 22 15:28:29 2020 +0100

    [FLINK-20266][runtime] Replace dedicated thread pool in ComponentClosingUtils with use of 'FutureUtils.orTimeout()'
    
    This removes extra (non-daemon) threads, which were previously keeping the threads alive.
---
 .../coordination}/ComponentClosingUtils.java       | 51 ++++++++++------------
 .../RecreateOnResetOperatorCoordinator.java        |  2 +-
 2 files changed, 23 insertions(+), 30 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
similarity index 71%
rename from flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index 65c83ae..581204a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -16,17 +16,12 @@
  limitations under the License.
  */
 
-package org.apache.flink.util;
+package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -34,18 +29,8 @@ import java.util.concurrent.TimeoutException;
  * A util class to help with a clean component shutdown.
  */
 public class ComponentClosingUtils {
-	private static final Logger LOG = LoggerFactory.getLogger(ComponentClosingUtils.class);
-	// A shared watchdog executor to handle the timeout closing.
-	private static final ScheduledExecutorService watchDog =
-			Executors.newSingleThreadScheduledExecutor((ThreadFactory) r -> {
-				Thread t = new Thread(r, "ComponentClosingUtil");
-				t.setUncaughtExceptionHandler((thread, exception) -> {
-					LOG.error("FATAL: The component closing util thread caught exception ", exception);
-					System.exit(-17);
-				});
-				return t;
-			});
 
+	/** Utility class, not meant to be instantiated. */
 	private ComponentClosingUtils() {}
 
 	/**
@@ -85,26 +70,34 @@ public class ComponentClosingUtils {
 			long closeTimeoutMs) {
 		final CompletableFuture<Void> future = new CompletableFuture<>();
 		// Start a dedicate thread to close the component.
-		Thread t = new Thread(() -> {
+		final Thread t = new Thread(() -> {
 			closingSequence.run();
 			future.complete(null);
 		});
 		// Use uncaught exception handler to handle exceptions during closing.
 		t.setUncaughtExceptionHandler((thread, error) -> future.completeExceptionally(error));
 		t.start();
-		// Schedule a watch dog job to the watching executor to detect timeout when
-		// closing the component.
-		watchDog.schedule(() -> {
-				if (t.isAlive()) {
-					t.interrupt();
-					future.completeExceptionally(new TimeoutException(
-							String.format("Failed to close the %s before timeout of %d milliseconds",
-									componentName, closeTimeoutMs)));
-				}
-			}, closeTimeoutMs, TimeUnit.MILLISECONDS);
+
+		// if the future fails due to a timeout, we interrupt the thread
+		future.exceptionally((error) -> {
+			if (error instanceof TimeoutException && t.isAlive()) {
+				abortThread(t);
+			}
+			return null;
+		});
+
+		FutureUtils.orTimeout(
+				future,
+				closeTimeoutMs, TimeUnit.MILLISECONDS);
+
 		return future;
 	}
 
+	static void abortThread(Thread t) {
+		// the abortion strategy is pretty simple here...
+		t.interrupt();
+	}
+
 	// ---------------------------
 
 	private static class ClosingException extends RuntimeException {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index af19225..c2e9cf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -33,7 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout;
+import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.closeAsyncWithTimeout;
 
 /**
  * A class that will recreate a new {@link OperatorCoordinator} instance when