You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/11 12:01:06 UTC
[flink] 03/05: [FLINK-13123] [cli] rename "-s" parameter of stop
command
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 150bccac338265953f5684210d47b01037341007
Author: Konstantin Knauf <kn...@gmail.com>
AuthorDate: Wed Jul 10 10:59:57 2019 +0200
[FLINK-13123] [cli] rename "-s" parameter of stop command
---
docs/ops/cli.md | 37 +++++++++++-----------
docs/ops/cli.zh.md | 37 +++++++++++-----------
.../apache/flink/client/cli/CliFrontendParser.java | 12 +++----
.../org/apache/flink/client/cli/StopOptions.java | 6 ++--
.../cli/CliFrontendStopWithSavepointTest.java | 10 +++---
5 files changed, 52 insertions(+), 50 deletions(-)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 5eeed31..8dabcbf 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -176,23 +176,7 @@ These examples about how to manage a job in CLI.
- Gracefully stop a job with a savepoint (streaming jobs only):
- ./bin/flink stop -s [targetDirectory] -d <jobID>
-
-
-**NOTE**: The difference between cancelling and stopping a (streaming) job is the following:
-
-On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
-soon as possible.
-If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
-until it stops.
-
-A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
-source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
-that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
-`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
-barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
-a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
- job to finish processing all in-flight data.
+ ./bin/flink stop [-p targetDirectory] [-d] <jobID>
### Savepoints
@@ -221,6 +205,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I
Everything else is the same as described in the above **Trigger a Savepoint** section.
+#### Stop
+
+Use the `stop` to gracefully stop a running streaming job with a savepoint.
+
+{% highlight bash %}
+./bin/flink stop [-p targetDirectory] [-d] <jobID>
+{% endhighlight %}
+
+A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
+source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
+that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
+`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
+barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
+a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
+ job to finish processing all in-flight data.
+
+
#### Cancel with a savepoint (deprecated)
You can atomically trigger a savepoint and cancel a job.
@@ -431,7 +432,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only).
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
- -s,--withSavepoint <withSavepoint> Path to the savepoint (for example
+ -p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index ce6049f..c4ecf9d 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -175,23 +175,7 @@ available.
- Gracefully stop a job with a savepoint (streaming jobs only):
- ./bin/flink stop -s [targetDirectory] -d <jobID>
-
-
-**NOTE**: The difference between cancelling and stopping a (streaming) job is the following:
-
-On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
-soon as possible.
-If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
-until it stops.
-
-A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
-source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
-that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
-`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
-barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
-a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
- job to finish processing all in-flight data.
+ ./bin/flink stop [-p targetDirectory] [-d] <jobID>
### Savepoints
@@ -220,6 +204,23 @@ This will trigger a savepoint for the job with ID `jobId` and YARN application I
Everything else is the same as described in the above **Trigger a Savepoint** section.
+#### Stop
+
+Use the `stop` to gracefully stop a running streaming job with a savepoint.
+
+{% highlight bash %}
+./bin/flink stop [-p targetDirectory] [-d] <jobID>
+{% endhighlight %}
+
+A "stop" call is a more graceful way of stopping a running streaming job, as the "stop" signal flows from
+source to sink. When the user requests to stop a job, all sources will be requested to send the last checkpoint barrier
+that will trigger a savepoint, and after the successful completion of that savepoint, they will finish by calling their
+`cancel()` method. If the `-d` flag is specified, then a `MAX_WATERMARK` will be emitted before the last checkpoint
+barrier. This will result all registered event-time timers to fire, thus flushing out any state that is waiting for
+a specific watermark, e.g. windows. The job will keep running until all sources properly shut down. This allows the
+ job to finish processing all in-flight data.
+
+
#### Cancel with a savepoint (deprecated)
You can atomically trigger a savepoint and cancel a job.
@@ -426,7 +427,7 @@ Action "stop" stops a running program with a savepoint (streaming jobs only).
"stop" action options:
-d,--drain Send MAX_WATERMARK before taking the
savepoint and stopping the pipelne.
- -s,--withSavepoint <withSavepoint> Path to the savepoint (for example
+ -p,--savepointPath <savepointPath> Path to the savepoint (for example
hdfs:///flink/savepoint-1537). If no
directory is specified, the configured
default will be used
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 3219e40..5639cb5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -113,7 +113,7 @@ public class CliFrontendParser {
"specified, the configured default directory (" +
CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used.");
- public static final Option STOP_WITH_SAVEPOINT = new Option("s", "withSavepoint", true,
+ public static final Option STOP_WITH_SAVEPOINT_PATH = new Option("p", "savepointPath", true,
"Path to the savepoint (for example hdfs:///flink/savepoint-1537). " +
"If no directory is specified, the configured default will be used (\"" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "\").");
@@ -176,9 +176,9 @@ public class CliFrontendParser {
CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
- STOP_WITH_SAVEPOINT.setRequired(false);
- STOP_WITH_SAVEPOINT.setArgName("withSavepoint");
- STOP_WITH_SAVEPOINT.setOptionalArg(true);
+ STOP_WITH_SAVEPOINT_PATH.setRequired(false);
+ STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath");
+ STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true);
STOP_AND_DRAIN.setRequired(false);
@@ -256,7 +256,7 @@ public class CliFrontendParser {
static Options getStopCommandOptions() {
return buildGeneralOptions(new Options())
- .addOption(STOP_WITH_SAVEPOINT)
+ .addOption(STOP_WITH_SAVEPOINT_PATH)
.addOption(STOP_AND_DRAIN);
}
@@ -293,7 +293,7 @@ public class CliFrontendParser {
private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
return options
- .addOption(STOP_WITH_SAVEPOINT)
+ .addOption(STOP_WITH_SAVEPOINT_PATH)
.addOption(STOP_AND_DRAIN);
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
index 1ade31c..c5693b7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN;
-import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT;
+import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH;
/**
* Command line options for the STOP command.
@@ -41,8 +41,8 @@ class StopOptions extends CommandLineOptions {
super(line);
this.args = line.getArgs();
- this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT.getOpt());
- this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT.getOpt());
+ this.savepointFlag = line.hasOption(STOP_WITH_SAVEPOINT_PATH.getOpt());
+ this.targetDirectory = line.getOptionValue(STOP_WITH_SAVEPOINT_PATH.getOpt());
this.advanceToEndOfEventTime = line.hasOption(STOP_AND_DRAIN.getOpt());
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
index 2c829cc..19872aa 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
@@ -80,7 +80,7 @@ public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
public void testStopWithDefaultSavepointDir() throws Exception {
JobID jid = new JobID();
- String[] parameters = { "-s", jid.toString() };
+ String[] parameters = {jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
@@ -93,7 +93,7 @@ public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
public void testStopWithExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
- String[] parameters = { "-s", "test-target-dir", jid.toString() };
+ String[] parameters = { "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
@@ -119,7 +119,7 @@ public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
public void testStopWithMaxWMAndDefaultSavepointDir() throws Exception {
JobID jid = new JobID();
- String[] parameters = { "-s", "-d", jid.toString() };
+ String[] parameters = { "-p", "-d", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
@@ -132,7 +132,7 @@ public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
public void testStopWithMaxWMAndExplicitSavepointDir() throws Exception {
JobID jid = new JobID();
- String[] parameters = { "-d", "-s", "test-target-dir", jid.toString() };
+ String[] parameters = { "-d", "-p", "test-target-dir", jid.toString() };
final ClusterClient<String> clusterClient = createClusterClient(null);
MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
testFrontend.stop(parameters);
@@ -181,7 +181,7 @@ public class CliFrontendStopWithSavepointTest extends CliFrontendTestBase {
// test unknown job Id
JobID jid = new JobID();
- String[] parameters = { "-s", "test-target-dir", jid.toString() };
+ String[] parameters = { "-p", "test-target-dir", jid.toString() };
String expectedMessage = "Test exception";
FlinkException testException = new FlinkException(expectedMessage);
final ClusterClient<String> clusterClient = createClusterClient(testException);