You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:04:40 UTC

[flink] branch master updated (1fb0636 -> a36d199)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1fb0636  [FLINK-10010] Mark BaseAlignedWindowAssigner as deprecated
     new 01fd885  [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
     new a36d199  [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/cli.md                                    | 10 +++++++
 .../org/apache/flink/client/cli/CliFrontend.java   | 31 ++++++++++++++--------
 .../apache/flink/client/cli/CliFrontendParser.java |  8 ++++++
 .../apache/flink/client/cli/ProgramOptions.java    |  8 ++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 ++
 5 files changed, 48 insertions(+), 11 deletions(-)


[flink] 01/02: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01fd88502a00bf47975e40f8d8433ebc7c23042b
Author: Sebastian Klemke <pa...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200

    [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
    
    This closes #6540.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index f8258b1..79dd79c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -251,13 +252,22 @@ public class CliFrontend {
 					LOG.info("Could not properly shut down the client.", e);
 				}
 			} else {
+				final Thread shutdownHook;
 				if (clusterId != null) {
 					client = clusterDescriptor.retrieve(clusterId);
+					shutdownHook = null;
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
 					final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
+					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
+					// there's a race-condition here if cli is killed before shutdown hook is installed
+					if (!runOptions.getDetachedMode()) {
+						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
+					} else {
+						shutdownHook = null;
+					}
 				}
 
 				try {
@@ -280,12 +290,12 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (clusterId == null && !client.isDetached()) {
+					if (shutdownHook != null) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
+							shutdownHook.run();
+						} finally {
+							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}
 


[flink] 02/02: [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a36d1999f743d00c4fac8fdf61ad85a4b5d5f3bc
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 19 11:16:00 2018 +0200

    [FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'
    
    This closes #6718.
---
 docs/ops/cli.md                                    | 10 ++++++++
 .../org/apache/flink/client/cli/CliFrontend.java   | 29 +++++++++++-----------
 .../apache/flink/client/cli/CliFrontendParser.java |  8 ++++++
 .../apache/flink/client/cli/ProgramOptions.java    |  8 ++++++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 ++
 5 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index f96ccf5..9af5c5b 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -257,6 +257,11 @@ Action "run" compiles and runs a program.
      -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                           from (for example
                                           hdfs:///flink/savepoint-1537).
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
   Options for yarn-cluster mode:
      -d,--detached                        If present, runs the job in detached
                                           mode
@@ -265,6 +270,11 @@ Action "run" compiles and runs a program.
                                           connect to a different JobManager than
                                           the one specified in the
                                           configuration.
+     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
+                                          mode, perform a best-effort cluster
+                                          shutdown when the CLI is terminated
+                                          abruptly, e.g., in response to a user
+                                          interrupt, such as typing Ctrl + C.
      -yD <property=value>                 use value for given property
      -yd,--yarndetached                   If present, runs the job in detached
                                           mode (deprecated; use non-YARN
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 79dd79c..c7e6344 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -263,7 +263,7 @@ public class CliFrontend {
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!runOptions.getDetachedMode()) {
+					if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
 						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
 					} else {
 						shutdownHook = null;
@@ -290,15 +290,18 @@ public class CliFrontend {
 
 					executeProgram(program, client, userParallelism);
 				} finally {
-					if (shutdownHook != null) {
+					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							shutdownHook.run();
-						} finally {
+							client.shutDownCluster();
+						} catch (final Exception e) {
+							LOG.info("Could not properly terminate the Flink cluster.", e);
+						}
+						if (shutdownHook != null) {
+							// we do not need the hook anymore as we have just tried to shutdown the cluster.
 							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
 						}
 					}
-
 					try {
 						client.shutdown();
 					} catch (Exception e) {
@@ -503,11 +506,10 @@ public class CliFrontend {
 		jobsByState.entrySet().stream()
 			.sorted(statusComparator)
 			.map(Map.Entry::getValue).flatMap(List::stream).sorted(startTimeComparator)
-			.forEachOrdered(job -> {
-			System.out.println(dateFormat.format(new Date(job.getStartTime()))
-				+ " : " + job.getJobId() + " : " + job.getJobName()
-				+ " (" + job.getJobState() + ")");
-		});
+			.forEachOrdered(job ->
+				System.out.println(dateFormat.format(new Date(job.getStartTime()))
+					+ " : " + job.getJobId() + " : " + job.getJobName()
+					+ " (" + job.getJobState() + ")"));
 	}
 
 	/**
@@ -838,11 +840,8 @@ public class CliFrontend {
 	 * Creates a Packaged program from the given command line options.
 	 *
 	 * @return A PackagedProgram (upon success)
-	 * @throws java.io.FileNotFoundException
-	 * @throws org.apache.flink.client.program.ProgramInvocationException
 	 */
-	protected PackagedProgram buildProgram(ProgramOptions options)
-			throws FileNotFoundException, ProgramInvocationException {
+	PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
 		String[] programArgs = options.getProgramArgs();
 		String jarFilePath = options.getJarFilePath();
 		List<URL> classpaths = options.getClasspaths();
@@ -1175,7 +1174,7 @@ public class CliFrontend {
 	 * @param address Address to write to the configuration
 	 * @param config The configuration to write to
 	 */
-	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
+	static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 		config.setString(RestOptions.ADDRESS, address.getHostString());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 357a87e..8eb0dd6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -63,6 +63,11 @@ public class CliFrontendParser {
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
+	public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
+		"sae", "shutdownOnAttachedExit", false,
+		"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+			"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
+
 	/**
 	 * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
 	 */
@@ -128,6 +133,7 @@ public class CliFrontendParser {
 
 		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
+		SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
 		YARN_DETACHED_OPTION.setRequired(false);
 
 		ARGS_OPTION.setRequired(false);
@@ -170,6 +176,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
 		return options;
 	}
@@ -180,6 +187,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		return options;
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index ccaa491..da03d64 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,6 +36,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -57,6 +58,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
+	private final boolean shutdownOnAttachedExit;
+
 	private final SavepointRestoreSettings savepointSettings;
 
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
@@ -113,6 +116,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
+		shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
 
 		this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
 	}
@@ -145,6 +149,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return detachedMode;
 	}
 
+	public boolean isShutdownOnAttachedExit() {
+		return shutdownOnAttachedExit;
+	}
+
 	public SavepointRestoreSettings getSavepointRestoreSettings() {
 		return savepointSettings;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 90aca89..65f813e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -87,6 +87,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
 
@@ -220,6 +221,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		allOptions.addOption(slots);
 		allOptions.addOption(dynamicproperties);
 		allOptions.addOption(DETACHED_OPTION);
+		allOptions.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		allOptions.addOption(YARN_DETACHED_OPTION);
 		allOptions.addOption(streaming);
 		allOptions.addOption(name);