You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/22 13:41:42 UTC

[GitHub] tillrohrmann closed pull request #6573: [Backport 1.6][FLINK-10164] Add fromSavepoint and allowNonRestoredState CLI options to StandaloneJobClusterEntrypoint

tillrohrmann closed pull request #6573: [Backport 1.6][FLINK-10164] Add fromSavepoint and allowNonRestoredState CLI options to StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/6573
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/deployment/kubernetes.md b/docs/ops/deployment/kubernetes.md
index 298e473d5f2..5244f5ed544 100644
--- a/docs/ops/deployment/kubernetes.md
+++ b/docs/ops/deployment/kubernetes.md
@@ -34,9 +34,8 @@ Please follow [Kubernetes' setup guide](https://kubernetes.io/docs/setup/) in or
 If you want to run Kubernetes locally, we recommend using [MiniKube](https://kubernetes.io/docs/setup/minikube/).
 
 <div class="alert alert-info" markdown="span">
-  <strong>Note:</strong> If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 
-  promisc on'` before deploying a Flink cluster. Otherwise Flink components are not able to self reference 
-  themselves through a Kubernetes service. 
+  <strong>Note:</strong> If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 promisc on'` before deploying a Flink cluster. 
+  Otherwise Flink components are not able to self reference themselves through a Kubernetes service. 
 </div>
 
 ## Flink session cluster on Kubernetes
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 0adb8cf75b2..357a87e4fbc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -76,10 +77,10 @@
 			"Address of the JobManager (master) to which to connect. " +
 			"Use this flag to connect to a different JobManager than the one specified in the configuration.");
 
-	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
+	public static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
 			"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
 
-	static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
+	public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
 			"Allow to skip savepoint state that cannot be restored. " +
 					"You need to allow this if you removed an operator from your " +
 					"program that was part of the program when the savepoint was triggered.");
@@ -401,6 +402,16 @@ private static void printCustomCliOptions(
 		}
 	}
 
+	public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
+		if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
+			String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+			boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
+			return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+		} else {
+			return SavepointRestoreSettings.none();
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Line Parsing
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 1acda1b5265..ccaa4916f9c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,8 +36,6 @@
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -116,13 +114,7 @@ else if (args.length > 0) {
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
 
-		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
-			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
-			boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
-			this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
-		} else {
-			this.savepointSettings = SavepointRestoreSettings.none();
-		}
+		this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
 	}
 
 	public String getJarFilePath() {
diff --git a/flink-container/docker/README.md b/flink-container/docker/README.md
index 7d3c030554f..3ff70c6b3bd 100644
--- a/flink-container/docker/README.md
+++ b/flink-container/docker/README.md
@@ -39,6 +39,7 @@ The `docker-compose.yml` contains the following parameters:
 * `FLINK_JOB` - Name of the Flink job to execute (default: none)
 * `DEFAULT_PARALLELISM` - Default parallelism with which to start the job (default: 1)
 * `FLINK_JOB_ARGUMENTS` - Additional arguments which will be passed to the job cluster (default: none)
+* `SAVEPOINT_OPTIONS` - Savepoint options to start the cluster with (default: none)
 
 The parameters can be set by exporting the corresponding environment variable.
 
@@ -55,6 +56,11 @@ This will automatically start `DEFAULT_PARALLELISM` TaskManagers:
         
         FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> DEFAULT_PARALLELISM=<DEFAULT_PARALLELISM> docker-compose up
         
+In order to resume the job from a savepoint set `SAVEPOINT_OPTIONS`.
+Supported options are `--fromSavepoint <SAVEPOINT_PATH>` and `--allowNonRestoredState` where `<SAVEPOINT_PATH>` is accessible from all containers.
+
+        FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> SAVEPOINT_OPTIONS="--fromSavepoint <SAVEPOINT_PATH> --allowNonRestoredState" docker-compose up 
+        
 One can also provide additional job arguments via `FLINK_JOB_ARGUMENTS` which are passed to the job:
         
         FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_ARGUMENTS=<JOB_ARGUMENTS> docker-compose up
diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml
index 28b53684883..a5e9b49f60c 100644
--- a/flink-container/docker/docker-compose.yml
+++ b/flink-container/docker/docker-compose.yml
@@ -23,6 +23,7 @@
 # * FLINK_JOB - Name of the Flink job to execute (default: none)
 # * DEFAULT_PARALLELISM - Default parallelism with which to start the job (default: 1)
 # * FLINK_JOB_ARGUMENTS - Additional arguments which will be passed to the job cluster (default: none)
+# * SAVEPOINT_OPTIONS - Savepoint options to start the cluster with (default: none)
 
 version: "2.2"
 services:
@@ -30,7 +31,7 @@ services:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
     ports:
       - "8081:8081"
-    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${FLINK_JOB_ARGUMENTS}
+    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${SAVEPOINT_OPTIONS} ${FLINK_JOB_ARGUMENTS}
 
   taskmanager:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
diff --git a/flink-container/kubernetes/README.md b/flink-container/kubernetes/README.md
index efba823052f..8a663f691f9 100644
--- a/flink-container/kubernetes/README.md
+++ b/flink-container/kubernetes/README.md
@@ -38,6 +38,13 @@ At last, you should start the task manager deployment:
 
 `FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < task-manager-deployment.yaml.template | kubectl create -f -`
 
+## Resuming from a savepoint
+
+In order to resume from a savepoint, one needs to pass the savepoint path to the cluster entrypoint.
+This can be achieved by adding `"--fromSavepoint", "<SAVEPOINT_PATH>"` to the `args` field in the [job-cluster-job.yaml.template](job-cluster-job.yaml.template).
+Note that `<SAVEPOINT_PATH>` needs to be accessible from the `job-cluster-job` pod (e.g. adding it to the image or storing it on a DFS).
+Additionally one can specify `"--allowNonRestoredState"` to allow that savepoint state is skipped which cannot be restored.
+
 ## Interact with Flink job cluster
 
 After starting the job cluster service, the web UI will be available under `<NODE_IP>:30081`.
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index e68e74b80a4..326e924b448 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.container.entrypoint;
 
 import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
 
@@ -28,16 +29,26 @@
  * Configuration for the {@link StandaloneJobClusterEntryPoint}.
  */
 final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {
+
 	@Nonnull
 	private final String jobClassName;
 
-	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) {
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
 		super(configDir, dynamicProperties, args, restPort);
 		this.jobClassName = jobClassName;
+		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
 
 	@Nonnull
 	String getJobClassName() {
 		return jobClassName;
 	}
+
+	@Nonnull
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointRestoreSettings;
+	}
 }
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index c0cb4739725..3c65ba864ed 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -53,6 +55,8 @@ public Options getOptions() {
 		options.addOption(REST_PORT_OPTION);
 		options.addOption(JOB_CLASS_NAME_OPTION);
 		options.addOption(DYNAMIC_PROPERTY_OPTION);
+		options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
+		options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
 
 		return options;
 	}
@@ -64,12 +68,14 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma
 		final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortString);
 		final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
+		final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
 		return new StandaloneJobClusterConfiguration(
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
 			restPort,
-			jobClassName);
+			jobClassName,
+			savepointRestoreSettings);
 	}
 }
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 57f7ca239b7..163e9ac734a 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -33,6 +33,7 @@
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -64,10 +65,18 @@
 	@Nonnull
 	private final String jobClassName;
 
-	StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) {
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	StandaloneJobClusterEntryPoint(
+			Configuration configuration,
+			@Nonnull String jobClassName,
+			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
+			@Nonnull String[] programArguments) {
 		super(configuration);
 		this.programArguments = checkNotNull(programArguments);
 		this.jobClassName = checkNotNull(jobClassName);
+		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
 
 	@Override
@@ -77,6 +86,7 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc
 		try {
 			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
 			jobGraph.setAllowQueuedScheduling(true);
+			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 
 			return jobGraph;
 		} catch (Exception e) {
@@ -143,7 +153,7 @@ public static void main(String[] args) {
 			clusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(StandaloneJobClusterEntryPoint.class.getSimpleName());
 			System.exit(1);
 		}
 
@@ -151,8 +161,10 @@ public static void main(String[] args) {
 
 		configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
 
-		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration,
+		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
+			configuration,
 			clusterConfiguration.getJobClassName(),
+			clusterConfiguration.getSavepointRestoreSettings(),
 			clusterConfiguration.getArgs());
 
 		entrypoint.startCluster();
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
index 1f39a0609e7..4d36e497bfa 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -38,28 +39,30 @@
 public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {
 
 	private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+	private static final String JOB_CLASS_NAME = "foobar";
+	private static final String CONFIG_DIR = "/foo/bar";
 
 	@Test
 	public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
-		final String configDir = "/foo/bar";
 		final String key = "key";
 		final String value = "value";
 		final int restPort = 1234;
-		final String jobClassName = "foobar";
 		final String arg1 = "arg1";
 		final String arg2 = "arg2";
-		final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2};
+		final String[] args = {"--configDir", CONFIG_DIR, "--webui-port", String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};
 
 		final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
 
-		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
-		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(CONFIG_DIR)));
+		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
 		assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
 		final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();
 
 		assertThat(dynamicProperties, hasEntry(key, value));
 
 		assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
+
+		assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
 	}
 
 	@Test
@@ -81,4 +84,17 @@ public void testMissingRequiredArgument() throws FlinkParseException {
 
 		commandLineParser.parse(args);
 	}
+
+	@Test
+	public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
+		final String restorePath = "foobar";
+		final String[] args = {"-c", CONFIG_DIR, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
+		final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);
+
+		final SavepointRestoreSettings savepointRestoreSettings = standaloneJobClusterConfiguration.getSavepointRestoreSettings();
+
+		assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
+		assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
+		assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
+	}
 }
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
index d97d2b714d4..2faec4c6627 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -35,6 +36,8 @@
  */
 public class StandaloneJobClusterEntryPointTest extends TestLogger {
 
+	public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
+
 	@Test
 	public void testJobGraphRetrieval() throws FlinkException {
 		final Configuration configuration = new Configuration();
@@ -43,7 +46,8 @@ public void testJobGraphRetrieval() throws FlinkException {
 		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
 			configuration,
 			TestJob.class.getCanonicalName(),
-			new String[] {"--arg", "suffix"});
+			SavepointRestoreSettings.none(),
+			PROGRAM_ARGUMENTS);
 
 		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
 
@@ -51,4 +55,18 @@ public void testJobGraphRetrieval() throws FlinkException {
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
 	}
 
+	@Test
+	public void testSavepointRestoreSettings() throws FlinkException {
+		final Configuration configuration = new Configuration();
+		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
+		final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
+			configuration,
+			TestJob.class.getCanonicalName(),
+			savepointRestoreSettings,
+			PROGRAM_ARGUMENTS);
+
+		final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
+
+		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
+	}
 }
diff --git a/flink-dist/src/main/flink-bin/bin/standalone-job.sh b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
index 889dab89836..8a98a198785 100644
--- a/flink-dist/src/main/flink-bin/bin/standalone-job.sh
+++ b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
@@ -18,12 +18,12 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop"
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
 
 STARTSTOP=$1
 ENTRY_POINT_NAME="standalonejob"
 
-if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] || [[ -z JOB_CLASSNAME ]]; then
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
   echo $USAGE
   exit 1
 fi
@@ -58,7 +58,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
-    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRY_POINT_NAME "${ARGS[@]}"
+    exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
 else
-    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRY_POINT_NAME "${ARGS[@]}"
+    "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
 fi
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index d56725c5bf0..23b87bc8507 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -92,7 +92,7 @@ public static void main(String[] args) {
 			entrypointClusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
 			System.exit(1);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
index f9e199c8baf..1c07fb764aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
@@ -55,8 +55,10 @@ public T parse(@Nonnull String[] args) throws FlinkParseException {
 		return parserResultFactory.createResult(commandLine);
 	}
 
-	public void printHelp() {
+	public void printHelp(@Nonnull String cmdLineSyntax) {
 		final HelpFormatter helpFormatter = new HelpFormatter();
-		helpFormatter.printHelp("", parserResultFactory.getOptions());
+		helpFormatter.setLeftPadding(5);
+		helpFormatter.setWidth(80);
+		helpFormatter.printHelp(cmdLineSyntax, parserResultFactory.getOptions(), true);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c1f0cc84a39..c9b6b4be43c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -315,7 +315,7 @@ private static Configuration loadConfiguration(String[] args) throws FlinkParseE
 			clusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse the command line options.", e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(TaskManagerRunner.class.getSimpleName());
 			throw e;
 		}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services