You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:02:36 UTC

[flink] 02/02: [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 941557026b291217a73ca290ad70b367955d0168
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 19 11:16:00 2018 +0200

    [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'
    
    This closes #6718.
---
 docs/ops/cli.md                                      | 10 ++++++++++
 .../org/apache/flink/client/cli/CliFrontend.java     | 20 ++++++++++----------
 .../apache/flink/client/cli/CliFrontendParser.java   |  8 ++++++++
 .../org/apache/flink/client/cli/ProgramOptions.java  |  8 ++++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java   |  2 ++
 5 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 439cd84..b6e8654 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -253,6 +253,11 @@ Action "run" compiles and runs a program.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                           from (for example
                                           hdfs:///flink/savepoint-1537).
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
   Options for yarn-cluster mode:
      -d,--detached                        If present, runs the job in detached
                                           mode
@@ -261,6 +266,11 @@ Action "run" compiles and runs a program.
                                           connect to a different JobManager than
                                           the one specified in the
                                           configuration.
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
      -yD <property=value>                 use value for given property
      -yd,--yarndetached                   If present, runs the job in detached
                                           mode (deprecated; use non-YARN
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 780f814..94074f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -261,7 +261,7 @@ public class CliFrontend {
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!runOptions.getDetachedMode()) {
+					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
 						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
 					} else {
 						shutdownHook = null;
@@ -288,15 +288,18 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (shutdownHook != null) {
+					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							shutdownHook.run();
-						} finally {
+							client.shutDownCluster();
+						} catch (final Exception e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+						if (shutdownHook != null) {
+							// we do not need the hook anymore as we have just tried to shutdown the cluster.
 							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}
-
 					try {
 						client.shutdown();
 					} catch (Exception e) {
@@ -818,11 +821,8 @@ public class CliFrontend {
 	 * Creates a Packaged program from the given command line options.
 	 *
 	 * @return A PackagedProgram (upon success)
-	 * @throws java.io.FileNotFoundException
-	 * @throws org.apache.flink.client.program.ProgramInvocationException
 	 */
-	protected PackagedProgram buildProgram(ProgramOptions options)
-			throws FileNotFoundException, ProgramInvocationException {
+	PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
@@ -1154,7 +1154,7 @@ public class CliFrontend {
 	 * @param address Address to write to the configuration
 	 * @param config The configuration to write to
 	 */
-	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
+	static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 		config.setString(RestOptions.ADDRESS, address.getHostString());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 1588aac..8eac249 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,11 @@ public class CliFrontendParser {
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
+	public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
+		"sae", "shutdownOnAttachedExit", false,
+		"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+			"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
+
 	/**
 	 * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
 	 */
@@ -124,6 +129,7 @@ public class CliFrontendParser {
 
 		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
+		SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
 		YARN_DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
@@ -166,6 +172,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
 		return options;
 	}
@@ -176,6 +183,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		return options;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 1acda1b..832370d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -59,6 +60,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
+	private final boolean shutdownOnAttachedExit;
+
 	private final SavepointRestoreSettings savepointSettings;
 
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
@@ -115,6 +118,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
+		shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
 
 		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
 			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
@@ -153,6 +157,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return detachedMode;
 	}
 
+	public boolean isShutdownOnAttachedExit() {
+		return shutdownOnAttachedExit;
+	}
+
 	public SavepointRestoreSettings getSavepointRestoreSettings() {
 		return savepointSettings;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 698d48d..1119b2c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -83,6 +83,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
@@ -214,6 +215,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		allOptions.addOption(slots);
 		allOptions.addOption(dynamicproperties);
 		allOptions.addOption(DETACHED_OPTION);
+		allOptions.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		allOptions.addOption(YARN_DETACHED_OPTION);
 		allOptions.addOption(streaming);
 		allOptions.addOption(name);