You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/04 01:44:29 UTC

[incubator-seatunnel] branch dev updated: [Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b650981 [Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)
2b650981 is described below

commit 2b650981b15680e89eaff027038ff08822cb7b75
Author: legendtkl <ta...@gmail.com>
AuthorDate: Wed May 4 09:44:25 2022 +0800

    [Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)
    
    * refactor flink-sql module
    Co-authored-by: taokelu <ta...@bytedance.com>
---
 .../src/main/bin/start-seatunnel-sql.sh            | 96 ++++------------------
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java | 56 +++++++++++++
 .../apache/seatunnel/core/sql/SeatunnelSql.java    |  3 +-
 .../core/sql/SqlVariableSubstitutionTest.java      |  3 +-
 .../apache/seatunnel/core/flink/FlinkStarter.java  | 62 ++------------
 .../seatunnel/core/flink/SeatunnelFlink.java       |  3 +-
 .../core/flink/args/FlinkCommandArgs.java          | 15 ++++
 .../FlinkJobType.java}                             | 24 +++---
 .../FlinkConstant.java}                            | 22 +----
 .../core/flink/utils/CommandLineUtils.java         | 47 +++++++++++
 .../core/flink/utils/CommandLineUtilsTest.java     | 66 +++++++++++++++
 11 files changed, 226 insertions(+), 171 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
index 368db2af..63b1a86d 100755
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
@@ -16,84 +16,24 @@
 # limitations under the License.
 #
 
-# copy command line arguments
-
-function usage() {
-  echo "Usage: start-seatunnel-sql.sh [options]"
-  echo "  options:"
-  echo "    --config, -c FILE_PATH        Config file"
-  echo "    --variable, -i PROP=VALUE     Variable substitution, such as -i city=beijing, or -i date=20190318"
-  echo "    --check, -t                   Check config"
-  echo "    --help, -h                    Show this help message"
-}
-
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then
-  usage
-  exit 0
-fi
-
-is_exist() {
-    if [ -z $1 ]; then
-      usage
-      exit -1
-    fi
-}
-
-PARAMS=""
-while (( "$#" )); do
-  case "$1" in
-    -c|--config)
-      CONFIG_FILE=$2
-      is_exist ${CONFIG_FILE}
-      shift 2
-      ;;
-
-    -i|--variable)
-      variable=$2
-      is_exist ${variable}
-      java_property_value="-D${variable}"
-      variables_substitution="${java_property_value} ${variables_substitution}"
-      shift 2
-      ;;
-
-    *) # preserve positional arguments
-      PARAMS="$PARAMS $1"
-      shift
-      ;;
-
-  esac
-done
-
-if [ -z ${CONFIG_FILE} ]; then
-  echo "Error: The following option is required: [-c | --config]"
-  usage
-  exit -1
-elif [ ! -f ${CONFIG_FILE} ];then
-  echo "Error: Config file ${CONFIG_FILE} does not exists! Please check it."
-  exit -1
-fi
-
-# set positional arguments in their proper place
-eval set -- "$PARAMS"
-
-BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-APP_DIR=$(dirname ${BIN_DIR})
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
 CONF_DIR=${APP_DIR}/config
-PLUGINS_DIR=${APP_DIR}/lib
-DEFAULT_CONFIG=${CONF_DIR}/application.conf
-CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}
+APP_JAR=${APP_DIR}/lib/seatunnel-core-flink-sql.jar
 
-assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink-sql*.jar)
-
-source ${CONF_DIR}/seatunnel-env.sh
-
-string_trim() {
-    echo $1 | awk '{$1=$1;print}'
-}
-
-export JVM_ARGS=$(string_trim "${variables_substitution}")
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+    . "${CONF_DIR}/seatunnel-env.sh"
+fi
 
-exec ${FLINK_HOME}/bin/flink run \
-    ${PARAMS} \
-    -c org.apache.seatunnel.core.sql.SeatunnelSql \
-    ${assemblyJarName} --config ${CONFIG_FILE}
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkSqlStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+    # print usage
+    echo ${CMD}
+    exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+    echo "Execute SeaTunnel Flink SQL Job: ${CMD}"
+    eval ${CMD}
+else
+    echo ${CMD}
+    exit ${EXIT_CODE}
+fi
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
new file mode 100644
index 00000000..1e5f52a4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import java.util.List;
+
+public class FlinkSqlStarter implements Starter {
+
+    private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
+    private static final String CLASS_NAME = SeatunnelSql.class.getName();
+
+    private final FlinkCommandArgs flinkCommandArgs;
+    /**
+     * SeaTunnel flink sql job jar.
+     */
+    private final String appJar;
+
+    FlinkSqlStarter(String[] args) {
+        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        // set the deployment mode, used to get the job jar path.
+        Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+        this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
+    }
+
+    @Override
+    public List<String> buildCommands() {
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) {
+        FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
+        System.out.println(String.join(" ", flinkSqlStarter.buildCommands()));
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 042970c9..47a83268 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.core.sql;
 
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.Executor;
 import org.apache.seatunnel.core.sql.job.JobInfo;
@@ -36,7 +37,7 @@ public class SeatunnelSql {
     }
 
     private static JobInfo parseJob(String[] args) throws IOException {
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index 6a00ac5b..03891ab4 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.core.sql;
 
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.JobInfo;
 
@@ -36,7 +37,7 @@ public class SqlVariableSubstitutionTest {
         String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
             "-t", "-i", "table_name=events", "-i", "table_name2=print_table"};
 
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 91549617..6fe18a49 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.core.flink;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.base.Starter;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
-import com.beust.jcommander.JCommander;
-
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -33,15 +31,7 @@ import java.util.Objects;
 public class FlinkStarter implements Starter {
 
     private static final String APP_NAME = SeatunnelFlink.class.getName();
-    private static final int USAGE_EXIT_CODE = 234;
     private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
-    private static final String RUN_MODE_RUN = "run";
-    private static final String RUN_MODE_APPLICATION = "run-application";
-
-    /**
-     * Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
-     */
-    private final List<String> flinkParams = new ArrayList<>();
 
     /**
      * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
@@ -54,7 +44,7 @@ public class FlinkStarter implements Starter {
     private final String appJar;
 
     FlinkStarter(String[] args) {
-        this.flinkCommandArgs = parseArgs(args);
+        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
         this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -63,54 +53,12 @@ public class FlinkStarter implements Starter {
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public static void main(String[] args) {
         FlinkStarter flinkStarter = new FlinkStarter(args);
-        List<String> command = flinkStarter.buildCommands();
-        String finalFLinkCommand = String.join(" ", command);
-        System.out.println(finalFLinkCommand);
-    }
-
-    /**
-     * Parse seatunnel args.
-     *
-     * @param args args
-     * @return FlinkCommandArgs
-     */
-    private FlinkCommandArgs parseArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander jCommander = JCommander.newBuilder()
-            .programName("start-seatunnel-flink.sh")
-            .addObject(flinkCommandArgs)
-            .acceptUnknownOptions(true)
-            .args(args)
-            .build();
-        // The args is not belongs to seatunnel, add into flink params
-        flinkParams.addAll(jCommander.getUnknownOptions());
-        if (flinkCommandArgs.isHelp()) {
-            jCommander.usage();
-            System.exit(USAGE_EXIT_CODE);
-        }
-        return flinkCommandArgs;
+        System.out.println(String.join(" ", flinkStarter.buildCommands()));
     }
 
     @Override
     public List<String> buildCommands() {
-        List<String> command = new ArrayList<>();
-        command.add("${FLINK_HOME}/bin/flink");
-        command.add(flinkCommandArgs.getRunMode().getMode());
-        command.addAll(flinkParams);
-        command.add("-c");
-        command.add(APP_NAME);
-        command.add(appJar);
-        command.add("--config");
-        command.add(flinkCommandArgs.getConfigFile());
-        if (flinkCommandArgs.isCheckConfig()) {
-            command.add("--check");
-        }
-        // set System properties
-        flinkCommandArgs.getVariables().stream()
-            .filter(Objects::nonNull)
-            .map(String::trim)
-            .forEach(variable -> command.add("-D" + variable));
-        return command;
+        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index 020ff1bd..45792d00 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
     public static void main(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
             .buildCommand(flinkCommandArgs);
         Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index 82783dce..fb892f9c 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
+import java.util.List;
+
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
@@ -32,6 +34,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
 
+    /**
+     * Undefined parameters parsed will be stored here as flink command parameters.
+     */
+    private List<String> flinkParams;
+
     @Override
     public EngineType getEngineType() {
         return EngineType.FLINK;
@@ -50,6 +57,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.runMode = runMode;
     }
 
+    public List<String> getFlinkParams() {
+        return flinkParams;
+    }
+
+    public void setFlinkParams(List<String> flinkParams) {
+        this.flinkParams = flinkParams;
+    }
+
     /**
      * Used to convert the run mode string to the enum value.
      */
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
similarity index 57%
copy from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
index d5cf42a1..6abe5f75 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.flink.utils;
+package org.apache.seatunnel.core.flink.config;
 
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+public enum FlinkJobType {
+    JAR("start-seatunnel-flink.sh"),
+    SQL("start-seatunnel-sql.sh"),
+    ;
 
-import com.beust.jcommander.JCommander;
+    private final String type;
 
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+    FlinkJobType(String type) {
+        this.type = type;
     }
 
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
+    public String getType() {
+        return this.type;
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
similarity index 55%
copy from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
index d5cf42a1..b57c6293 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
@@ -15,24 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.flink.utils;
+package org.apache.seatunnel.core.flink.constant;
 
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
-    }
+public class FlinkConstant {
+    public static final int USAGE_EXIT_CODE = 234;
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index d5cf42a1..028464ef 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -17,10 +17,17 @@
 
 package org.apache.seatunnel.core.flink.utils;
 
+import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
+
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
 
 import com.beust.jcommander.JCommander;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
 public class CommandLineUtils {
 
     private CommandLineUtils() {
@@ -35,4 +42,44 @@ public class CommandLineUtils {
             .parse(args);
         return flinkCommandArgs;
     }
+
+    public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        JCommander jCommander = JCommander.newBuilder()
+            .programName(jobType.getType())
+            .addObject(flinkCommandArgs)
+            .acceptUnknownOptions(true)
+            .args(args)
+            .build();
+        // The args is not belongs to seatunnel, add into flink params
+        flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
+        if (flinkCommandArgs.isHelp()) {
+            jCommander.usage();
+            System.exit(USAGE_EXIT_CODE);
+        }
+        return flinkCommandArgs;
+
+    }
+
+    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
+        List<String> command = new ArrayList<>();
+        command.add("${FLINK_HOME}/bin/flink");
+        command.add(flinkCommandArgs.getRunMode().getMode());
+        command.addAll(flinkCommandArgs.getFlinkParams());
+        command.add("-c");
+        command.add(className);
+        command.add(jarPath);
+        command.add("--config");
+        command.add(flinkCommandArgs.getConfigFile());
+        if (flinkCommandArgs.isCheckConfig()) {
+            command.add("--check");
+        }
+        // set System properties
+        flinkCommandArgs.getVariables().stream()
+          .filter(Objects::nonNull)
+          .map(String::trim)
+          .forEach(variable -> command.add("-D" + variable));
+        return command;
+
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
new file mode 100644
index 00000000..66f1ec69
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.utils;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.config.FlinkRunMode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CommandLineUtilsTest {
+
+    @Test
+    public void testParseCommandArgs() {
+        String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+        Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+
+        String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL);
+        Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+        Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+    }
+
+    @Test
+    public void testBuildFlinkCommand() {
+        String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+            "-r", "run-application", "--unkown", "unkown-command"};
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        Assert.assertEquals(commands,
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+        Assert.assertEquals(commands,
+            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+                "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+    }
+}