You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/26 12:02:36 UTC
[flink] 02/02: [FLINK-9891] Make shutdown of started cluster in
attached mode optional, add cli option 'schutdownOnAttachedExist'
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 941557026b291217a73ca290ad70b367955d0168
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 19 11:16:00 2018 +0200
[FLINK-9891] Make shutdown of started cluster in attached mode optional, add cli option 'schutdownOnAttachedExist'
This closes #6718.
---
docs/ops/cli.md | 10 ++++++++++
.../org/apache/flink/client/cli/CliFrontend.java | 20 ++++++++++----------
.../apache/flink/client/cli/CliFrontendParser.java | 8 ++++++++
.../org/apache/flink/client/cli/ProgramOptions.java | 8 ++++++++
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 ++
5 files changed, 38 insertions(+), 10 deletions(-)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 439cd84..b6e8654 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -253,6 +253,11 @@ Action "run" compiles and runs a program.
-s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job
from (for example
hdfs:///flink/savepoint-1537).
+ -sae,--shutdownOnAttachedExit If the job is submitted in attached
+ mode, perform a best-effort cluster
+ shutdown when the CLI is terminated
+ abruptly, e.g., in response to a user
+ interrupt, such as typing Ctrl + C.
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
@@ -261,6 +266,11 @@ Action "run" compiles and runs a program.
connect to a different JobManager than
the one specified in the
configuration.
+ -sae,--shutdownOnAttachedExit If the job is submitted in attached
+ mode, perform a best-effort cluster
+ shutdown when the CLI is terminated
+ abruptly, e.g., in response to a user
+ interrupt, such as typing Ctrl + C.
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 780f814..94074f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -261,7 +261,7 @@ public class CliFrontend {
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
- if (!runOptions.getDetachedMode()) {
+ if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
} else {
shutdownHook = null;
@@ -288,15 +288,18 @@ public class CliFrontend {
executeProgram(program, client, userParallelism);
} finally {
- if (shutdownHook != null) {
+ if (clusterId == null && !client.isDetached()) {
// terminate the cluster only if we have started it before and if it's not detached
try {
- shutdownHook.run();
- } finally {
+ client.shutDownCluster();
+ } catch (final Exception e) {
+ LOG.info("Could not properly terminate the Flink cluster.", e);
+ }
+ if (shutdownHook != null) {
+ // we do not need the hook anymore as we have just tried to shutdown the cluster.
ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
}
}
-
try {
client.shutdown();
} catch (Exception e) {
@@ -818,11 +821,8 @@ public class CliFrontend {
* Creates a Packaged program from the given command line options.
*
* @return A PackagedProgram (upon success)
- * @throws java.io.FileNotFoundException
- * @throws org.apache.flink.client.program.ProgramInvocationException
*/
- protected PackagedProgram buildProgram(ProgramOptions options)
- throws FileNotFoundException, ProgramInvocationException {
+ PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
@@ -1154,7 +1154,7 @@ public class CliFrontend {
* @param address Address to write to the configuration
* @param config The configuration to write to
*/
- public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
+ static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
config.setString(RestOptions.ADDRESS, address.getHostString());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 1588aac..8eac249 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,11 @@ public class CliFrontendParser {
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");
+ public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
+ "sae", "shutdownOnAttachedExit", false,
+ "If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
+ "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
+
/**
* @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
*/
@@ -124,6 +129,7 @@ public class CliFrontendParser {
LOGGING_OPTION.setRequired(false);
DETACHED_OPTION.setRequired(false);
+ SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
YARN_DETACHED_OPTION.setRequired(false);
ARGS_OPTION.setRequired(false);
@@ -166,6 +172,7 @@ public class CliFrontendParser {
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
+ options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
options.addOption(YARN_DETACHED_OPTION);
return options;
}
@@ -176,6 +183,7 @@ public class CliFrontendParser {
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
+ options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
return options;
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 1acda1b..832370d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
/**
@@ -59,6 +60,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
private final boolean detachedMode;
+ private final boolean shutdownOnAttachedExit;
+
private final SavepointRestoreSettings savepointSettings;
protected ProgramOptions(CommandLine line) throws CliArgsException {
@@ -115,6 +118,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
YARN_DETACHED_OPTION.getOpt());
+ shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());
if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
@@ -153,6 +157,10 @@ public abstract class ProgramOptions extends CommandLineOptions {
return detachedMode;
}
+ public boolean isShutdownOnAttachedExit() {
+ return shutdownOnAttachedExit;
+ }
+
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 698d48d..1119b2c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -83,6 +83,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
@@ -214,6 +215,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
allOptions.addOption(slots);
allOptions.addOption(dynamicproperties);
allOptions.addOption(DETACHED_OPTION);
+ allOptions.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
allOptions.addOption(YARN_DETACHED_OPTION);
allOptions.addOption(streaming);
allOptions.addOption(name);