You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/08/22 13:41:44 UTC

[flink] branch release-1.6 updated (b6d8fb9 -> 5434ca5)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b6d8fb9  [FLINK-10172] [table] Fix Table.orderBy(String) by adding asc & desc to ExpressionParser.
     new f75ac57  [FLINK-10164] Add support for resuming from a savepoint to StandaloneJobClusterEntrypoint
     new 5b611ae  [hotfix] Update standalone-job.sh usage string
     new 8776f70  [hotfix] Improve StandaloneJobClusterEntrypoint command line help
     new 90f76ca  [hotfix] Add support for savepoint options to docker-compose template
     new 5434ca5  [hotfix][docs] Document how to resume from a savepoint with a job cluster on K8s

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/ops/deployment/kubernetes.md                  |  5 ++---
 .../apache/flink/client/cli/CliFrontendParser.java | 15 +++++++++++--
 .../apache/flink/client/cli/ProgramOptions.java    | 10 +--------
 flink-container/docker/README.md                   |  6 +++++
 flink-container/docker/docker-compose.yml          |  3 ++-
 flink-container/kubernetes/README.md               |  7 ++++++
 .../StandaloneJobClusterConfiguration.java         | 13 ++++++++++-
 ...daloneJobClusterConfigurationParserFactory.java |  8 ++++++-
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 18 ++++++++++++---
 ...neJobClusterConfigurationParserFactoryTest.java | 26 +++++++++++++++++-----
 .../StandaloneJobClusterEntryPointTest.java        | 20 ++++++++++++++++-
 .../src/main/flink-bin/bin/standalone-job.sh       |  8 +++----
 .../StandaloneSessionClusterEntrypoint.java        |  2 +-
 .../entrypoint/parser/CommandLineParser.java       |  6 +++--
 .../runtime/taskexecutor/TaskManagerRunner.java    |  2 +-
 15 files changed, 115 insertions(+), 34 deletions(-)


[flink] 02/05: [hotfix] Update standalone-job.sh usage string

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b611ae7d6e3ec1c3771e9f46b6f243c6e7b5b05
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 16 18:34:46 2018 +0200

    [hotfix] Update standalone-job.sh usage string
---
 flink-dist/src/main/flink-bin/bin/standalone-job.sh | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/standalone-job.sh b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
index 889dab8..8a98a19 100644
--- a/flink-dist/src/main/flink-bin/bin/standalone-job.sh
+++ b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
@@ -18,12 +18,12 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop"
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
 
 STARTSTOP=$1
 ENTRY_POINT_NAME="standalonejob"
 
-if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] || [[ -z JOB_CLASSNAME ]]; then
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
   echo $USAGE
   exit 1
 fi
@@ -58,7 +58,7 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
-    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRY_POINT_NAME "${ARGS[@]}"
+    exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
 else
-    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRY_POINT_NAME "${ARGS[@]}"
+    "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
 fi


[flink] 01/05: [FLINK-10164] Add support for resuming from a savepoint to StandaloneJobClusterEntrypoint

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f75ac572c7fdc8725ae6a5a876ef750d741846fc
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 16 18:12:12 2018 +0200

    [FLINK-10164] Add support for resuming from a savepoint to StandaloneJobClusterEntrypoint
    
    The StandaloneJobClusterEntrypoint accepts now CLI options to specify a savepoint path and
    whether to allow non restored state or not. If the entrypoint is started with a savepoint
    path, then the job will try to resume from this savepoint.
    
    This closes #6573.
---
 .../apache/flink/client/cli/CliFrontendParser.java | 15 +++++++++++--
 .../apache/flink/client/cli/ProgramOptions.java    | 10 +--------
 .../StandaloneJobClusterConfiguration.java         | 13 ++++++++++-
 ...daloneJobClusterConfigurationParserFactory.java |  8 ++++++-
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 16 +++++++++++--
 ...neJobClusterConfigurationParserFactoryTest.java | 26 +++++++++++++++++-----
 .../StandaloneJobClusterEntryPointTest.java        | 20 ++++++++++++++++-
 7 files changed, 87 insertions(+), 21 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 0adb8cf..357a87e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -76,10 +77,10 @@ public class CliFrontendParser {
 			"Address of the JobManager (master) to which to connect. " +
 			"Use this flag to connect to a different JobManager than the one specified in the configuration.");
 
-	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
+	public static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
 			"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
 
-	static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
+	public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
 			"Allow to skip savepoint state that cannot be restored. " +
 					"You need to allow this if you removed an operator from your " +
 					"program that was part of the program when the savepoint was triggered.");
@@ -401,6 +402,16 @@ public class CliFrontendParser {
 		}
 	}
 
+	public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
+		if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
+			String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+			boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
+			return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
+		} else {
+			return SavepointRestoreSettings.none();
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Line Parsing
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 1acda1b..ccaa491 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -36,8 +36,6 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
 
 /**
@@ -116,13 +114,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
 			YARN_DETACHED_OPTION.getOpt());
 
-		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
-			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
-			boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
-			this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
-		} else {
-			this.savepointSettings = SavepointRestoreSettings.none();
-		}
+		this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line);
 	}
 
 	public String getJarFilePath() {
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index e68e74b..326e924 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.container.entrypoint;
 
 import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
 
@@ -28,16 +29,26 @@ import java.util.Properties;
  * Configuration for the {@link StandaloneJobClusterEntryPoint}.
  */
 final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration {
+
 	@Nonnull
 	private final String jobClassName;
 
-	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName) {
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) {
 		super(configDir, dynamicProperties, args, restPort);
 		this.jobClassName = jobClassName;
+		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
 
 	@Nonnull
 	String getJobClassName() {
 		return jobClassName;
 	}
+
+	@Nonnull
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointRestoreSettings;
+	}
 }
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index c0cb473..3c65ba8 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -53,6 +55,8 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
 		options.addOption(REST_PORT_OPTION);
 		options.addOption(JOB_CLASS_NAME_OPTION);
 		options.addOption(DYNAMIC_PROPERTY_OPTION);
+		options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);
+		options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
 
 		return options;
 	}
@@ -64,12 +68,14 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes
 		final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
 		final int restPort = Integer.parseInt(restPortString);
 		final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
+		final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
 		return new StandaloneJobClusterConfiguration(
 			configDir,
 			dynamicProperties,
 			commandLine.getArgs(),
 			restPort,
-			jobClassName);
+			jobClassName,
+			savepointRestoreSettings);
 	}
 }
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 57f7ca2..0c2dcf3 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -64,10 +65,18 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	@Nonnull
 	private final String jobClassName;
 
-	StandaloneJobClusterEntryPoint(Configuration configuration, @Nonnull String jobClassName, @Nonnull String[] programArguments) {
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	StandaloneJobClusterEntryPoint(
+			Configuration configuration,
+			@Nonnull String jobClassName,
+			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
+			@Nonnull String[] programArguments) {
 		super(configuration);
 		this.programArguments = checkNotNull(programArguments);
 		this.jobClassName = checkNotNull(jobClassName);
+		this.savepointRestoreSettings = savepointRestoreSettings;
 	}
 
 	@Override
@@ -77,6 +86,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 		try {
 			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
 			jobGraph.setAllowQueuedScheduling(true);
+			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
 
 			return jobGraph;
 		} catch (Exception e) {
@@ -151,8 +161,10 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 
 		configuration.setString(ClusterEntrypoint.EXECUTION_MODE, ExecutionMode.DETACHED.toString());
 
-		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(configuration,
+		StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint(
+			configuration,
 			clusterConfiguration.getJobClassName(),
+			clusterConfiguration.getSavepointRestoreSettings(),
 			clusterConfiguration.getArgs());
 
 		entrypoint.startCluster();
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
index 1f39a06..4d36e49 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.container.entrypoint;
 
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -38,28 +39,30 @@ import static org.junit.Assert.assertThat;
 public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogger {
 
 	private static final CommandLineParser<StandaloneJobClusterConfiguration> commandLineParser = new CommandLineParser<>(new StandaloneJobClusterConfigurationParserFactory());
+	private static final String JOB_CLASS_NAME = "foobar";
+	private static final String CONFIG_DIR = "/foo/bar";
 
 	@Test
 	public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
-		final String configDir = "/foo/bar";
 		final String key = "key";
 		final String value = "value";
 		final int restPort = 1234;
-		final String jobClassName = "foobar";
 		final String arg1 = "arg1";
 		final String arg2 = "arg2";
-		final String[] args = {"--configDir", configDir, "--webui-port", String.valueOf(restPort), "--job-classname", jobClassName, String.format("-D%s=%s", key, value), arg1, arg2};
+		final String[] args = {"--configDir", CONFIG_DIR, "--webui-port", String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, String.format("-D%s=%s", key, value), arg1, arg2};
 
 		final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args);
 
-		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir)));
-		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
+		assertThat(clusterConfiguration.getConfigDir(), is(equalTo(CONFIG_DIR)));
+		assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
 		assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
 		final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();
 
 		assertThat(dynamicProperties, hasEntry(key, value));
 
 		assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
+
+		assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none())));
 	}
 
 	@Test
@@ -81,4 +84,17 @@ public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg
 
 		commandLineParser.parse(args);
 	}
+
+	@Test
+	public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
+		final String restorePath = "foobar";
+		final String[] args = {"-c", CONFIG_DIR, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
+		final StandaloneJobClusterConfiguration standaloneJobClusterConfiguration = commandLineParser.parse(args);
+
+		final SavepointRestoreSettings savepointRestoreSettings = standaloneJobClusterConfiguration.getSavepointRestoreSettings();
+
+		assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
+		assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
+		assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
+	}
 }
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
index d97d2b7..2faec4c 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.container.entrypoint;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -35,6 +36,8 @@ import static org.junit.Assert.assertThat;
  */
 public class StandaloneJobClusterEntryPointTest extends TestLogger {
 
+	public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
+
 	@Test
 	public void testJobGraphRetrieval() throws FlinkException {
 		final Configuration configuration = new Configuration();
@@ -43,7 +46,8 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
 		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
 			configuration,
 			TestJob.class.getCanonicalName(),
-			new String[] {"--arg", "suffix"});
+			SavepointRestoreSettings.none(),
+			PROGRAM_ARGUMENTS);
 
 		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
 
@@ -51,4 +55,18 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
 	}
 
+	@Test
+	public void testSavepointRestoreSettings() throws FlinkException {
+		final Configuration configuration = new Configuration();
+		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
+		final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
+			configuration,
+			TestJob.class.getCanonicalName(),
+			savepointRestoreSettings,
+			PROGRAM_ARGUMENTS);
+
+		final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
+
+		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
+	}
 }


[flink] 04/05: [hotfix] Add support for savepoint options to docker-compose template

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90f76ca70f8197f991ca70c4d509c4c959287c4e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 16 19:49:03 2018 +0200

    [hotfix] Add support for savepoint options to docker-compose template
---
 flink-container/docker/README.md          | 6 ++++++
 flink-container/docker/docker-compose.yml | 3 ++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/flink-container/docker/README.md b/flink-container/docker/README.md
index 7d3c030..3ff70c6 100644
--- a/flink-container/docker/README.md
+++ b/flink-container/docker/README.md
@@ -39,6 +39,7 @@ The `docker-compose.yml` contains the following parameters:
 * `FLINK_JOB` - Name of the Flink job to execute (default: none)
 * `DEFAULT_PARALLELISM` - Default parallelism with which to start the job (default: 1)
 * `FLINK_JOB_ARGUMENTS` - Additional arguments which will be passed to the job cluster (default: none)
+* `SAVEPOINT_OPTIONS` - Savepoint options to start the cluster with (default: none)
 
 The parameters can be set by exporting the corresponding environment variable.
 
@@ -55,6 +56,11 @@ This will automatically start `DEFAULT_PARALLELISM` TaskManagers:
         
         FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> DEFAULT_PARALLELISM=<DEFAULT_PARALLELISM> docker-compose up
         
+In order to resume the job from a savepoint set `SAVEPOINT_OPTIONS`.
+Supported options are `--fromSavepoint <SAVEPOINT_PATH>` and `--allowNonRestoredState` where `<SAVEPOINT_PATH>` is accessible from all containers.
+
+        FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> SAVEPOINT_OPTIONS="--fromSavepoint <SAVEPOINT_PATH> --allowNonRestoredState" docker-compose up 
+        
 One can also provide additional job arguments via `FLINK_JOB_ARGUMENTS` which are passed to the job:
         
         FLINK_DOCKER_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_ARGUMENTS=<JOB_ARGUMENTS> docker-compose up
diff --git a/flink-container/docker/docker-compose.yml b/flink-container/docker/docker-compose.yml
index 28b5368..a5e9b49 100644
--- a/flink-container/docker/docker-compose.yml
+++ b/flink-container/docker/docker-compose.yml
@@ -23,6 +23,7 @@
 # * FLINK_JOB - Name of the Flink job to execute (default: none)
 # * DEFAULT_PARALLELISM - Default parallelism with which to start the job (default: 1)
 # * FLINK_JOB_ARGUMENTS - Additional arguments which will be passed to the job cluster (default: none)
+# * SAVEPOINT_OPTIONS - Savepoint options to start the cluster with (default: none)
 
 version: "2.2"
 services:
@@ -30,7 +31,7 @@ services:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}
     ports:
       - "8081:8081"
-    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${FLINK_JOB_ARGUMENTS}
+    command: job-cluster --job-classname ${FLINK_JOB} -Djobmanager.rpc.address=job-cluster -Dparallelism.default=${DEFAULT_PARALLELISM:-1} ${SAVEPOINT_OPTIONS} ${FLINK_JOB_ARGUMENTS}
 
   taskmanager:
     image: ${FLINK_DOCKER_IMAGE_NAME:-flink-job}


[flink] 05/05: [hotfix][docs] Document how to resume from a savepoint with a job cluster on K8s

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5434ca56a75e4686efe21d5f545b5d9fb85f828c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 16 19:56:47 2018 +0200

    [hotfix][docs] Document how to resume from a savepoint with a job cluster on K8s
---
 docs/ops/deployment/kubernetes.md    | 5 ++---
 flink-container/kubernetes/README.md | 7 +++++++
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/docs/ops/deployment/kubernetes.md b/docs/ops/deployment/kubernetes.md
index 298e473..5244f5e 100644
--- a/docs/ops/deployment/kubernetes.md
+++ b/docs/ops/deployment/kubernetes.md
@@ -34,9 +34,8 @@ Please follow [Kubernetes' setup guide](https://kubernetes.io/docs/setup/) in or
 If you want to run Kubernetes locally, we recommend using [MiniKube](https://kubernetes.io/docs/setup/minikube/).
 
 <div class="alert alert-info" markdown="span">
-  <strong>Note:</strong> If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 
-  promisc on'` before deploying a Flink cluster. Otherwise Flink components are not able to self reference 
-  themselves through a Kubernetes service. 
+  <strong>Note:</strong> If using MiniKube please make sure to execute `minikube ssh 'sudo ip link set docker0 promisc on'` before deploying a Flink cluster. 
+  Otherwise Flink components are not able to self reference themselves through a Kubernetes service. 
 </div>
 
 ## Flink session cluster on Kubernetes
diff --git a/flink-container/kubernetes/README.md b/flink-container/kubernetes/README.md
index efba823..8a663f6 100644
--- a/flink-container/kubernetes/README.md
+++ b/flink-container/kubernetes/README.md
@@ -38,6 +38,13 @@ At last, you should start the task manager deployment:
 
 `FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < task-manager-deployment.yaml.template | kubectl create -f -`
 
+## Resuming from a savepoint
+
+In order to resume from a savepoint, one needs to pass the savepoint path to the cluster entrypoint.
+This can be achieved by adding `"--fromSavepoint", "<SAVEPOINT_PATH>"` to the `args` field in the [job-cluster-job.yaml.template](job-cluster-job.yaml.template).
+Note that `<SAVEPOINT_PATH>` needs to be accessible from the `job-cluster-job` pod (e.g. adding it to the image or storing it on a DFS).
+Additionally one can specify `"--allowNonRestoredState"` to allow that savepoint state is skipped which cannot be restored.
+
 ## Interact with Flink job cluster
 
 After starting the job cluster service, the web UI will be available under `<NODE_IP>:30081`.


[flink] 03/05: [hotfix] Improve StandaloneJobClusterEntrypoint command line help

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8776f707614859dd9744a3e758a3ab8b0390524e
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Aug 16 19:33:21 2018 +0200

    [hotfix] Improve StandaloneJobClusterEntrypoint command line help
    
    Properly print the CLI help if the command line options could not be parsed.
---
 .../flink/container/entrypoint/StandaloneJobClusterEntryPoint.java  | 2 +-
 .../runtime/entrypoint/StandaloneSessionClusterEntrypoint.java      | 2 +-
 .../apache/flink/runtime/entrypoint/parser/CommandLineParser.java   | 6 ++++--
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java    | 2 +-
 4 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0c2dcf3..163e9ac 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -153,7 +153,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 			clusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(StandaloneJobClusterEntryPoint.class.getSimpleName());
 			System.exit(1);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index d56725c..23b87bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -92,7 +92,7 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 			entrypointClusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
 			System.exit(1);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
index f9e199c..1c07fb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineParser.java
@@ -55,8 +55,10 @@ public class CommandLineParser<T> {
 		return parserResultFactory.createResult(commandLine);
 	}
 
-	public void printHelp() {
+	public void printHelp(@Nonnull String cmdLineSyntax) {
 		final HelpFormatter helpFormatter = new HelpFormatter();
-		helpFormatter.printHelp("", parserResultFactory.getOptions());
+		helpFormatter.setLeftPadding(5);
+		helpFormatter.setWidth(80);
+		helpFormatter.printHelp(cmdLineSyntax, parserResultFactory.getOptions(), true);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index c1f0cc8..c9b6b4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -315,7 +315,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			clusterConfiguration = commandLineParser.parse(args);
 		} catch (FlinkParseException e) {
 			LOG.error("Could not parse the command line options.", e);
-			commandLineParser.printHelp();
+			commandLineParser.printHelp(TaskManagerRunner.class.getSimpleName());
 			throw e;
 		}