You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:02:35 UTC

[flink] 01/02: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70b48d63e5aed6411ccbdb443af0222515331867
Author: Sebastian Klemke <pa...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200

    [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
    
    This closes #6540.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a..780f814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ public class CliFrontend {
 					LOG.info("Could not properly shut down the client.", e);
 				}
 			} else {
+				final Thread shutdownHook;
 				if (clusterId != null) {
 					client = clusterDescriptor.retrieve(clusterId);
+					shutdownHook = null;
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
 					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+					// there's a race-condition here if cli is killed before shutdown hook is installed
+					if (!runOptions.getDetachedMode()) {
+						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+					} else {
+						shutdownHook = null;
+					}
 				}
 
 				try {
@@ -278,12 +288,12 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (clusterId == null && !client.isDetached()) {
+					if (shutdownHook != null) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
+							shutdownHook.run();
+						} finally {
+							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}