You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/02/11 18:01:40 UTC

[flink] 02/02: [FLINK-21274] Block main thread when running the TaskManagerRunner

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7f3b369229328ddc717776fca767ae4428df53a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Feb 9 16:22:06 2021 +0100

    [FLINK-21274] Block main thread when running the TaskManagerRunner
    
    In order to ensure that the TaskManager properly shuts down we need to let
    the main thread block on the execution of the TaskManager. This will ensure
    that there is always a non-daemon thread as long as the TaskManager runs.
    
    This closes #14914.
---
 .../taskmanager/KubernetesTaskExecutorRunner.java  |   2 +-
 .../mesos/entrypoint/MesosTaskExecutorRunner.java  |   2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    | 126 +++++++++++++++------
 .../taskexecutor/TaskManagerRunnerTest.java        |  35 +++---
 ...tractTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |  16 ++-
 6 files changed, 113 insertions(+), 70 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
index cd4f276..cf95c70 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/taskmanager/KubernetesTaskExecutorRunner.java
@@ -36,6 +36,6 @@ public class KubernetesTaskExecutorRunner {
         SignalHandler.register(LOG);
         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
-        TaskManagerRunner.runTaskManagerSecurely(args);
+        TaskManagerRunner.runTaskManagerProcessSecurely(args);
     }
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index 575be61..3683f45 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -68,6 +68,6 @@ public class MesosTaskExecutorRunner {
             return;
         }
 
-        TaskManagerRunner.runTaskManagerSecurely(configuration);
+        TaskManagerRunner.runTaskManagerProcessSecurely(configuration);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index bd8eb77..5981766 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -80,6 +80,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -92,15 +93,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
  * and starts them.
  */
-public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
+public class TaskManagerRunner implements FatalErrorHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
 
     private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
 
-    private static final int STARTUP_FAILURE_RETURN_CODE = 1;
-
-    @VisibleForTesting static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+    private static final int SUCCESS_EXIT_CODE = 0;
+    @VisibleForTesting static final int FAILURE_EXIT_CODE = 1;
 
     private final Object lock = new Object();
 
@@ -123,7 +123,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
     private final TaskExecutorService taskExecutorService;
 
-    private final CompletableFuture<Void> terminationFuture;
+    private final CompletableFuture<Result> terminationFuture;
 
     private boolean shutdown;
 
@@ -193,7 +193,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
         this.shutdown = false;
         handleUnexpectedTaskExecutorServiceTermination();
 
-        MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
+        MemoryLogger.startIfConfigured(
+                LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
     }
 
     private void handleUnexpectedTaskExecutorServiceTermination() {
@@ -220,8 +221,19 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
         taskExecutorService.start();
     }
 
-    @Override
-    public CompletableFuture<Void> closeAsync() {
+    public void close() throws Exception {
+        try {
+            closeAsync().get();
+        } catch (ExecutionException e) {
+            ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e));
+        }
+    }
+
+    public CompletableFuture<Result> closeAsync() {
+        return closeAsync(Result.SUCCESS);
+    }
+
+    private CompletableFuture<Result> closeAsync(Result terminationResult) {
         synchronized (lock) {
             if (!shutdown) {
                 shutdown = true;
@@ -238,7 +250,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
                             if (throwable != null) {
                                 terminationFuture.completeExceptionally(throwable);
                             } else {
-                                terminationFuture.complete(null);
+                                terminationFuture.complete(terminationResult);
                             }
                         });
             }
@@ -291,7 +303,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
     }
 
     // export the termination future for caller to know it is terminated
-    public CompletableFuture<Void> getTerminationFuture() {
+    public CompletableFuture<Result> getTerminationFuture() {
         return terminationFuture;
     }
 
@@ -314,17 +326,15 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
                 && !ExceptionUtils.isMetaspaceOutOfMemoryError(exception)) {
             terminateJVM();
         } else {
-            closeAsync();
+            closeAsync(Result.FAILURE);
 
             FutureUtils.orTimeout(
                     terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
-            terminationFuture.whenComplete((Void ignored, Throwable throwable) -> terminateJVM());
         }
     }
 
     private void terminateJVM() {
-        System.exit(RUNTIME_FAILURE_RETURN_CODE);
+        System.exit(FAILURE_EXIT_CODE);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -345,7 +355,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
             LOG.info("Cannot determine the maximum number of open file descriptors");
         }
 
-        runTaskManagerSecurely(args);
+        runTaskManagerProcessSecurely(args);
     }
 
     public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
@@ -353,41 +363,70 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
                 args, TaskManagerRunner.class.getSimpleName());
     }
 
-    public static void runTaskManager(Configuration configuration, PluginManager pluginManager)
+    public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
             throws Exception {
-        final TaskManagerRunner taskManagerRunner =
-                new TaskManagerRunner(
-                        configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);
+        final TaskManagerRunner taskManagerRunner;
 
-        taskManagerRunner.start();
-    }
+        try {
+            taskManagerRunner =
+                    new TaskManagerRunner(
+                            configuration,
+                            pluginManager,
+                            TaskManagerRunner::createTaskExecutorService);
+            taskManagerRunner.start();
+        } catch (Exception exception) {
+            throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
+        }
 
-    public static void runTaskManagerSecurely(String[] args) {
         try {
-            Configuration configuration = loadConfiguration(args);
-            runTaskManagerSecurely(configuration);
+            return taskManagerRunner.getTerminationFuture().get().getExitCode();
         } catch (Throwable t) {
-            final Throwable strippedThrowable =
-                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-            LOG.error("TaskManager initialization failed.", strippedThrowable);
-            System.exit(STARTUP_FAILURE_RETURN_CODE);
+            throw new FlinkException(
+                    "Unexpected failure during runtime of TaskManagerRunner.",
+                    ExceptionUtils.stripExecutionException(t));
+        }
+    }
+
+    public static void runTaskManagerProcessSecurely(String[] args) {
+        Configuration configuration = null;
+
+        try {
+            configuration = loadConfiguration(args);
+        } catch (FlinkParseException fpe) {
+            LOG.error("Could not load the configuration.", fpe);
+            System.exit(FAILURE_EXIT_CODE);
         }
+
+        runTaskManagerProcessSecurely(checkNotNull(configuration));
     }
 
-    public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
+    public static void runTaskManagerProcessSecurely(Configuration configuration) {
         replaceGracefulExitWithHaltIfConfigured(configuration);
         final PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configuration);
         FileSystem.initialize(configuration, pluginManager);
 
-        SecurityUtils.install(new SecurityConfiguration(configuration));
+        int exitCode;
+        Throwable throwable = null;
 
-        SecurityUtils.getInstalledContext()
-                .runSecured(
-                        () -> {
-                            runTaskManager(configuration, pluginManager);
-                            return null;
-                        });
+        try {
+            SecurityUtils.install(new SecurityConfiguration(configuration));
+
+            exitCode =
+                    SecurityUtils.getInstalledContext()
+                            .runSecured(() -> runTaskManager(configuration, pluginManager));
+        } catch (Throwable t) {
+            throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+            exitCode = FAILURE_EXIT_CODE;
+        }
+
+        if (throwable != null) {
+            LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
+        } else {
+            LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
+        }
+
+        System.exit(exitCode);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -612,4 +651,19 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 
         CompletableFuture<Void> getTerminationFuture();
     }
+
+    public enum Result {
+        SUCCESS(SUCCESS_EXIT_CODE),
+        FAILURE(FAILURE_EXIT_CODE);
+
+        private final int exitCode;
+
+        Result(int exitCode) {
+            this.exitCode = exitCode;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 4cb7adc..00e6b0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -25,13 +25,10 @@ import org.apache.flink.configuration.TaskManagerOptionsInternal;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
 
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -39,10 +36,8 @@ import org.junit.rules.Timeout;
 import javax.annotation.Nonnull;
 
 import java.net.InetAddress;
-import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.core.testutils.FlinkMatchers.willNotComplete;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -54,15 +49,8 @@ public class TaskManagerRunnerTest extends TestLogger {
 
     @Rule public final Timeout timeout = Timeout.seconds(30);
 
-    private SystemExitTrackingSecurityManager systemExitTrackingSecurityManager;
     private TaskManagerRunner taskManagerRunner;
 
-    @Before
-    public void before() {
-        systemExitTrackingSecurityManager = new SystemExitTrackingSecurityManager();
-        System.setSecurityManager(systemExitTrackingSecurityManager);
-    }
-
     @After
     public void after() throws Exception {
         System.setSecurityManager(null);
@@ -80,8 +68,9 @@ public class TaskManagerRunnerTest extends TestLogger {
 
         taskManagerRunner.onFatalError(new RuntimeException());
 
-        Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
-        assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
+        assertThat(
+                taskManagerRunner.getTerminationFuture().join(),
+                is(equalTo(TaskManagerRunner.Result.FAILURE)));
     }
 
     @Test
@@ -91,8 +80,9 @@ public class TaskManagerRunnerTest extends TestLogger {
                 TaskManagerOptions.REGISTRATION_TIMEOUT, TimeUtils.parseDuration("10 ms"));
         taskManagerRunner = createTaskManagerRunner(configuration);
 
-        Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
-        assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
+        assertThat(
+                taskManagerRunner.getTerminationFuture().join(),
+                is(equalTo(TaskManagerRunner.Result.FAILURE)));
     }
 
     @Test
@@ -169,10 +159,11 @@ public class TaskManagerRunnerTest extends TestLogger {
                         createConfiguration(),
                         createTaskExecutorServiceFactory(taskExecutorService));
 
-        terminationFuture.completeExceptionally(new FlinkException("Test exception."));
+        terminationFuture.complete(null);
 
-        Integer statusCode = systemExitTrackingSecurityManager.getSystemExitFuture().get();
-        assertThat(statusCode, is(equalTo(TaskManagerRunner.RUNTIME_FAILURE_RETURN_CODE)));
+        assertThat(
+                taskManagerRunner.getTerminationFuture().join(),
+                is(equalTo(TaskManagerRunner.Result.FAILURE)));
     }
 
     @Test
@@ -191,11 +182,11 @@ public class TaskManagerRunnerTest extends TestLogger {
 
         taskManagerRunner.closeAsync();
 
-        terminationFuture.completeExceptionally(new FlinkException("Test exception."));
+        terminationFuture.complete(null);
 
         assertThat(
-                systemExitTrackingSecurityManager.getSystemExitFuture(),
-                willNotComplete(Duration.ofMillis(10L)));
+                taskManagerRunner.getTerminationFuture().join(),
+                is(equalTo(TaskManagerRunner.Result.SUCCESS)));
     }
 
     @Nonnull
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index abe5e13..ed7f217 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -336,7 +336,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 
                 TaskManagerRunner.runTaskManager(cfg, pluginManager);
             } catch (Throwable t) {
-                LOG.error("Failed to start TaskManager process", t);
+                LOG.error("Failed to run the TaskManager process", t);
                 System.exit(1);
             }
         }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 42f1652..6b0a629 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 
 /** This class is the executable entry point for running a TaskExecutor in a YARN container. */
@@ -76,23 +75,22 @@ public class YarnTaskExecutorRunner {
      * @param args The command line arguments.
      */
     private static void runTaskManagerSecurely(String[] args) {
+        Configuration configuration = null;
+
         try {
             LOG.debug("All environment variables: {}", ENV);
 
             final String currDir = ENV.get(Environment.PWD.key());
             LOG.info("Current working Directory: {}", currDir);
 
-            final Configuration configuration = TaskManagerRunner.loadConfiguration(args);
+            configuration = TaskManagerRunner.loadConfiguration(args);
             setupAndModifyConfiguration(configuration, currDir, ENV);
-
-            TaskManagerRunner.runTaskManagerSecurely(configuration);
         } catch (Throwable t) {
-            final Throwable strippedThrowable =
-                    ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-            // make sure that everything whatever ends up in the log
-            LOG.error("YARN TaskManager initialization failed.", strippedThrowable);
+            LOG.error("YARN TaskManager initialization failed.", t);
             System.exit(INIT_ERROR_EXIT_CODE);
         }
+
+        TaskManagerRunner.runTaskManagerProcessSecurely(Preconditions.checkNotNull(configuration));
     }
 
     @VisibleForTesting