You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:02:34 UTC

[flink] branch release-1.5 updated (c4db76e -> 9415570)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c4db76e  [hotfix] Add release notes for changed jobmanager.sh syntax
     new 70b48d6  [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
     new 9415570  [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/cli.md                                    | 10 ++++++++++
 .../org/apache/flink/client/cli/CliFrontend.java   | 22 ++++++++++++++++------
 .../apache/flink/client/cli/CliFrontendParser.java |  8 ++++++++
 .../apache/flink/client/cli/ProgramOptions.java    |  8 ++++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 ++
 5 files changed, 44 insertions(+), 6 deletions(-)


[flink] 02/02: [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 941557026b291217a73ca290ad70b367955d0168
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 19 11:16:00 2018 +0200

    [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'
    
    This closes #6718.
---
 docs/ops/cli.md                                      | 10 ++++++++++
 .../org/apache/flink/client/cli/CliFrontend.java     | 20 ++++++++++----------
 .../apache/flink/client/cli/CliFrontendParser.java   |  8 ++++++++
 .../org/apache/flink/client/cli/ProgramOptions.java  |  8 ++++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java   |  2 ++
 5 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 439cd84..b6e8654 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -253,6 +253,11 @@ Action "run" compiles and runs a program.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                           from (for example
                                           hdfs:///flink/savepoint-1537).
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
   Options for yarn-cluster mode:
      -d,--detached                        If present, runs the job in detached
                                           mode
@@ -261,6 +266,11 @@ Action "run" compiles and runs a program.
                                           connect to a different JobManager than
                                           the one specified in the
                                           configuration.
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
      -yD <property=value>                 use value for given property
      -yd,--yarndetached                   If present, runs the job in detached
                                           mode (deprecated; use non-YARN
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 780f814..94074f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -261,7 +261,7 @@ public class CliFrontend {
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!runOptions.getDetachedMode()) {
+					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
 						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
 					} else {
 						shutdownHook = null;
@@ -288,15 +288,18 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (shutdownHook != null) {
+					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							shutdownHook.run();
-						} finally {
+							client.shutDownCluster();
+						} catch (final Exception e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+						if (shutdownHook != null) {
+							// we do not need the hook anymore as we have just tried to shutdown the cluster.
 							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}
-
 					try {
 						client.shutdown();
 					} catch (Exception e) {
@@ -818,11 +821,8 @@ public class CliFrontend {
 	 * Creates a Packaged program from the given command line options.
 	 *
 	 * @return A PackagedProgram (upon success)
-	 * @throws java.io.FileNotFoundException
-	 * @throws org.apache.flink.client.program.ProgramInvocationException
 	 */
-	protected PackagedProgram buildProgram(ProgramOptions options)
-			throws FileNotFoundException, ProgramInvocationException {
+	PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
@@ -1154,7 +1154,7 @@ public class CliFrontend {
 	 * @param address Address to write to the configuration
 	 * @param config The configuration to write to
 	 */
-	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
+	static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 		config.setString(RestOptions.ADDRESS, address.getHostString());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 1588aac..8eac249 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,11 @@ public class CliFrontendParser {
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
+	public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
+		"sae", "shutdownOnAttachedExit", false,
+		"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+			"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
+
 	/**
 	 * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
 	 */
@@ -124,6 +129,7 @@ public class CliFrontendParser {
 
 		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
+		SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
 		YARN_DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
@@ -166,6 +172,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
 		return options;
 	}
@@ -176,6 +183,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		return options;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 1acda1b..832370d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -59,6 +60,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
+	private final boolean shutdownOnAttachedExit;
+
 	private final SavepointRestoreSettings savepointSettings;
 
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
@@ -115,6 +118,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
+		shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
 
 		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
 			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
@@ -153,6 +157,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return detachedMode;
 	}
 
+	public boolean isShutdownOnAttachedExit() {
+		return shutdownOnAttachedExit;
+	}
+
 	public SavepointRestoreSettings getSavepointRestoreSettings() {
 		return savepointSettings;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 698d48d..1119b2c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -83,6 +83,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
@@ -214,6 +215,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		allOptions.addOption(slots);
 		allOptions.addOption(dynamicproperties);
 		allOptions.addOption(DETACHED_OPTION);
+		allOptions.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		allOptions.addOption(YARN_DETACHED_OPTION);
 		allOptions.addOption(streaming);
 		allOptions.addOption(name);


[flink] 01/02: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70b48d63e5aed6411ccbdb443af0222515331867
Author: Sebastian Klemke <pa...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200

    [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
    
    This closes #6540.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a..780f814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ public class CliFrontend {
 					LOG.info("Could not properly shut down the client.", e);
 				}
 			} else {
+				final Thread shutdownHook;
 				if (clusterId != null) {
 					client = clusterDescriptor.retrieve(clusterId);
+					shutdownHook = null;
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
 					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+					// there's a race-condition here if cli is killed before shutdown hook is installed
+					if (!runOptions.getDetachedMode()) {
+						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+					} else {
+						shutdownHook = null;
+					}
 				}
 
 				try {
@@ -278,12 +288,12 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (clusterId == null && !client.isDetached()) {
+					if (shutdownHook != null) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
+							shutdownHook.run();
+						} finally {
+							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}