You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/25 15:35:55 UTC

[incubator-seatunnel] branch dev updated: [Feature] [seatunnel-core-flink] flink job support application mode (#1742)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4a7de002 [Feature] [seatunnel-core-flink] flink job support application mode (#1742)
4a7de002 is described below

commit 4a7de0021f52812d854ce5728bc66e6b5e141524
Author: legendtkl <ta...@gmail.com>
AuthorDate: Mon Apr 25 23:35:48 2022 +0800

    [Feature] [seatunnel-core-flink] flink job support application mode (#1742)
    
    * flink job support application mode
    * update related doc
---
 docs/en/command/usage.mdx                               |  3 +++
 .../org/apache/seatunnel/command/FlinkCommandArgs.java  | 13 +++++++++++++
 .../main/java/org/apache/seatunnel/FlinkStarter.java    | 15 ++++++++++++++-
 .../java/org/apache/seatunnel/FlinkStarterTest.java     | 17 +++++++++++++++++
 4 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/docs/en/command/usage.mdx b/docs/en/command/usage.mdx
index 5533fc50..364ecd43 100644
--- a/docs/en/command/usage.mdx
+++ b/docs/en/command/usage.mdx
@@ -59,9 +59,12 @@ bin/start-seatunnel-spark.sh \
 bin/start-seatunnel-flink.sh \
     -c config-path \
     -i key=value \
+    -r run-application \
     [other params]
 ```
 
+- Use `-r` or `--run-mode` to specify the flink job run mode, you can use `run-application` or `run` (default value)
+
 </TabItem>
 </Tabs>
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index 1044af9c..de02a524 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -20,8 +20,14 @@ package org.apache.seatunnel.command;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.config.EngineType;
 
+import com.beust.jcommander.Parameter;
+
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
+    @Parameter(names = {"-r", "--run-mode"},
+        description = "job run mode, run or run-application")
+    private String runMode = "run";
+
     @Override
     public EngineType getEngineType() {
         return EngineType.FLINK;
@@ -32,4 +38,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         return DeployMode.CLIENT;
     }
 
+    public String getRunMode() {
+        return runMode;
+    }
+
+    public void setRunMode(String runMode) {
+        this.runMode = runMode;
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
index d62984bb..59355506 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
@@ -34,6 +34,8 @@ public class FlinkStarter implements Starter {
     private static final String APP_NAME = SeatunnelFlink.class.getName();
     private static final int USAGE_EXIT_CODE = 234;
     private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
+    private static final String RUN_MODE_RUN = "run";
+    private static final String RUN_MODE_APPLICATION = "run-application";
 
     /**
      * Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
@@ -49,9 +51,20 @@ public class FlinkStarter implements Starter {
      * SeaTunnel flink job jar.
      */
     private final String appJar;
+    private final String runMode;
 
     FlinkStarter(String[] args) {
         this.flinkCommandArgs = parseArgs(args);
+
+        String mode = flinkCommandArgs.getRunMode();
+        switch (mode) {
+            case RUN_MODE_RUN:
+            case RUN_MODE_APPLICATION:
+                this.runMode = mode;
+                break;
+            default:
+                throw new IllegalArgumentException("Run mode " + mode + " not supported");
+        }
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -92,7 +105,7 @@ public class FlinkStarter implements Starter {
     public List<String> buildCommands() {
         List<String> command = new ArrayList<>();
         command.add("${FLINK_HOME}/bin/flink");
-        command.add("run");
+        command.add(runMode);
         command.addAll(flinkParams);
         command.add("-c");
         command.add(APP_NAME);
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
index 12d909c9..4ba5322f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
@@ -31,6 +31,23 @@ public class FlinkStarterTest {
         Assert.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
         Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
         Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
+        Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
+
+        String[] args1 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run-application"};
+        flinkExecuteCommand = String.join(" ", new FlinkStarter(args1).buildCommands());
+        Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run-application"));
+
+        String[] args2 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run"};
+        flinkExecuteCommand = String.join(" ", new FlinkStarter(args2).buildCommands());
+        Assert.assertTrue(flinkExecuteCommand.contains("${FLINK_HOME}/bin/flink run"));
+
+        try {
+            String[] args3 = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
+            new FlinkStarter(args3);
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof IllegalArgumentException);
+            Assert.assertEquals("Run mode run123 not supported", e.getMessage());
+        }
     }
 
     @Test