You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:03:49 UTC

[flink] branch release-1.6 updated (e3e4473 -> 8d92e4e)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e3e4473  [hotfix] Add release notes for changed jobmanager.sh syntax
     new d753db2  [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
     new 8d92e4e  [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/cli.md                                    | 10 +++++++
 .../org/apache/flink/client/cli/CliFrontend.java   | 31 ++++++++++++++--------
 .../apache/flink/client/cli/CliFrontendParser.java |  8 ++++++
 .../apache/flink/client/cli/ProgramOptions.java    |  8 ++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 ++
 5 files changed, 48 insertions(+), 11 deletions(-)


[flink] 02/02: [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d92e4ea86270e51b143334db5407b4526e8694e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 19 11:16:00 2018 +0200

    [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'
    
    This closes #6718.
---
 docs/ops/cli.md                                    | 10 ++++++++
 .../org/apache/flink/client/cli/CliFrontend.java   | 29 +++++++++++-----------
 .../apache/flink/client/cli/CliFrontendParser.java |  8 ++++++
 .../apache/flink/client/cli/ProgramOptions.java    |  8 ++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 ++
 5 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index f96ccf5..9af5c5b 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -257,6 +257,11 @@ Action "run" compiles and runs a program.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                           from (for example
                                           hdfs:///flink/savepoint-1537).
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
   Options for yarn-cluster mode:
      -d,--detached                        If present, runs the job in detached
                                           mode
@@ -265,6 +270,11 @@ Action "run" compiles and runs a program.
                                           connect to a different JobManager than
                                           the one specified in the
                                           configuration.
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
      -yD <property=value>                 use value for given property
      -yd,--yarndetached                   If present, runs the job in detached
                                           mode (deprecated; use non-YARN
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 98df8df..ae0052c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -262,7 +262,7 @@ public class CliFrontend {
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!runOptions.getDetachedMode()) {
+					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
 						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
 					} else {
 						shutdownHook = null;
@@ -289,15 +289,18 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (shutdownHook != null) {
+					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							shutdownHook.run();
-						} finally {
+							client.shutDownCluster();
+						} catch (final Exception e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+						if (shutdownHook != null) {
+							// we do not need the hook anymore as we have just tried to shutdown the cluster.
 							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}
-
 					try {
 						client.shutdown();
 					} catch (Exception e) {
@@ -502,11 +505,10 @@ public class CliFrontend {
 		jobsByState.entrySet().stream()
 			.sorted(statusComparator)
 			.map(Map.Entry::getValue).flatMap(List::stream).sorted(startTimeComparator)
-			.forEachOrdered(job -> {
-			System.out.println(dateFormat.format(new Date(job.getStartTime()))
-				+ " : " + job.getJobId() + " : " + job.getJobName()
-				+ " (" + job.getJobState() + ")");
-		});
+			.forEachOrdered(job ->
+				System.out.println(dateFormat.format(new Date(job.getStartTime()))
+					+ " : " + job.getJobId() + " : " + job.getJobName()
+					+ " (" + job.getJobState() + ")"));
 	}
 
 	/**
@@ -837,11 +839,8 @@ public class CliFrontend {
 	 * Creates a Packaged program from the given command line options.
 	 *
 	 * @return A PackagedProgram (upon success)
-	 * @throws java.io.FileNotFoundException
-	 * @throws org.apache.flink.client.program.ProgramInvocationException
 	 */
-	protected PackagedProgram buildProgram(ProgramOptions options)
-			throws FileNotFoundException, ProgramInvocationException {
+	PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
@@ -1173,7 +1172,7 @@ public class CliFrontend {
 	 * @param address Address to write to the configuration
 	 * @param config The configuration to write to
 	 */
-	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
+	static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 		config.setString(RestOptions.ADDRESS, address.getHostString());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 357a87e..8eb0dd6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -63,6 +63,11 @@ public class CliFrontendParser {
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
+	public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
+		"sae", "shutdownOnAttachedExit", false,
+		"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+			"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
+
 	/**
 	 * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
 	 */
@@ -128,6 +133,7 @@ public class CliFrontendParser {
 
 		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
+		SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
 		YARN_DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
@@ -170,6 +176,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
 		return options;
 	}
@@ -180,6 +187,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		return options;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index ccaa491..da03d64 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,6 +36,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -57,6 +58,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
+	private final boolean shutdownOnAttachedExit;
+
 	private final SavepointRestoreSettings savepointSettings;
 
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
@@ -113,6 +116,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
+		shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
 
 		this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
 	}
@@ -145,6 +149,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return detachedMode;
 	}
 
+	public boolean isShutdownOnAttachedExit() {
+		return shutdownOnAttachedExit;
+	}
+
 	public SavepointRestoreSettings getSavepointRestoreSettings() {
 		return savepointSettings;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index c0180a8..e0c0f94 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -85,6 +85,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
@@ -218,6 +219,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		allOptions.addOption(slots);
 		allOptions.addOption(dynamicproperties);
 		allOptions.addOption(DETACHED_OPTION);
+		allOptions.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		allOptions.addOption(YARN_DETACHED_OPTION);
 		allOptions.addOption(streaming);
 		allOptions.addOption(name);


[flink] 01/02: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d753db27757376def72eda1d3c312a3d128a4bae
Author: Sebastian Klemke <pa...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200

    [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
    
    This closes #6540.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e2a260c..98df8df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -250,13 +251,22 @@ public class CliFrontend {
 					LOG.info("Could not properly shut down the client.", e);
 				}
 			} else {
+				final Thread shutdownHook;
 				if (clusterId != null) {
 					client = clusterDescriptor.retrieve(clusterId);
+					shutdownHook = null;
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
 					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+					// there's a race-condition here if cli is killed before shutdown hook is installed
+					if (!runOptions.getDetachedMode()) {
+						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+					} else {
+						shutdownHook = null;
+					}
 				}
 
 				try {
@@ -279,12 +289,12 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (clusterId == null && !client.isDetached()) {
+					if (shutdownHook != null) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
+							shutdownHook.run();
+						} finally {
+							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}