You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/25 15:35:55 UTC
[incubator-seatunnel] branch dev updated: [Feature] [seatunnel-core-flink] flink job support application mode (#1742)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4a7de002 [Feature] [seatunnel-core-flink] flink job support application mode (#1742)
4a7de002 is described below
commit 4a7de0021f52812d854ce5728bc66e6b5e141524
Author: legendtkl <ta...@gmail.com>
AuthorDate: Mon Apr 25 23:35:48 2022 +0800
[Feature] [seatunnel-core-flink] flink job support application mode (#1742)
* flink job support application mode
* update related doc
---
docs/en/command/usage.mdx | 3 +++
.../org/apache/seatunnel/command/FlinkCommandArgs.java | 13 +++++++++++++
.../main/java/org/apache/seatunnel/FlinkStarter.java | 15 ++++++++++++++-
.../java/org/apache/seatunnel/FlinkStarterTest.java | 17 +++++++++++++++++
4 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/docs/en/command/usage.mdx b/docs/en/command/usage.mdx
index 5533fc50..364ecd43 100644
--- a/docs/en/command/usage.mdx
+++ b/docs/en/command/usage.mdx
@@ -59,9 +59,12 @@ bin/start-seatunnel-spark.sh \
bin/start-seatunnel-flink.sh \
-c config-path \
-i key=value \
+ -r run-application \
[other params]
```
+- Use `-r` or `--run-mode` to specify the flink job run mode, you can use `run-application` or `run` (default value)
+
</TabItem>
</Tabs>
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index 1044af9c..de02a524 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -20,8 +20,14 @@ package org.apache.seatunnel.command;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.config.EngineType;
+import com.beust.jcommander.Parameter;
+
public class FlinkCommandArgs extends AbstractCommandArgs {
+ @Parameter(names = {"-r", "--run-mode"},
+ description = "job run mode, run or run-application")
+ private String runMode = "run";
+
@Override
public EngineType getEngineType() {
return EngineType.FLINK;
@@ -32,4 +38,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
return DeployMode.CLIENT;
}
+ public String getRunMode() {
+ return runMode;
+ }
+
+ public void setRunMode(String runMode) {
+ this.runMode = runMode;
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
index d62984bb..59355506 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
@@ -34,6 +34,8 @@ public class FlinkStarter implements Starter {
private static final String APP_NAME = SeatunnelFlink.class.getName();
private static final int USAGE_EXIT_CODE = 234;
private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
+ private static final String RUN_MODE_RUN = "run";
+ private static final String RUN_MODE_APPLICATION = "run-application";
/**
* Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
@@ -49,9 +51,20 @@ public class FlinkStarter implements Starter {
* SeaTunnel flink job jar.
*/
private final String appJar;
+ private final String runMode;
FlinkStarter(String[] args) {
this.flinkCommandArgs = parseArgs(args);
+
+ String mode = flinkCommandArgs.getRunMode();
+ switch (mode) {
+ case RUN_MODE_RUN:
+ case RUN_MODE_APPLICATION:
+ this.runMode = mode;
+ break;
+ default:
+ throw new IllegalArgumentException("Run mode " + mode + " not supported");
+ }
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -92,7 +105,7 @@ public class FlinkStarter implements Starter {
public List<String> buildCommands() {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
- command.add("run");
+ command.add(runMode);
command.addAll(flinkParams);
command.add("-c");
command.add(APP_NAME);
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
index 12d909c9..4ba5322f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
@@ -31,6 +31,23 @@ public class FlinkStarterTest {
Assert.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
+ Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
+
+ String[] args1 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
+ flinkExecuteCommand = String.join(" ", new FlinkStarter(args1).buildCommands());
+ Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run-application"));
+
+ String[] args2 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run"};
+ flinkExecuteCommand = String.join(" ", new FlinkStarter(args2).buildCommands());
+ Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
+
+ try {
+ String[] args3 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
+ new FlinkStarter(args3);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
+ Assert.assertEquals("Run mode run123 not supported", e.getMessage());
+ }
}
@Test