You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/04 01:44:29 UTC
[incubator-seatunnel] branch dev updated: [Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b650981 [Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)
2b650981 is described below
commit 2b650981b15680e89eaff027038ff08822cb7b75
Author: legendtkl <ta...@gmail.com>
AuthorDate: Wed May 4 09:44:25 2022 +0800
[Refactor][seatunnel-core-flink] refactor flink-sql module (#1783)
* refactor flink-sql module
Co-authored-by: taokelu <ta...@bytedance.com>
---
.../src/main/bin/start-seatunnel-sql.sh | 96 ++++------------------
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 56 +++++++++++++
.../apache/seatunnel/core/sql/SeatunnelSql.java | 3 +-
.../core/sql/SqlVariableSubstitutionTest.java | 3 +-
.../apache/seatunnel/core/flink/FlinkStarter.java | 62 ++------------
.../seatunnel/core/flink/SeatunnelFlink.java | 3 +-
.../core/flink/args/FlinkCommandArgs.java | 15 ++++
.../FlinkJobType.java} | 24 +++---
.../FlinkConstant.java} | 22 +----
.../core/flink/utils/CommandLineUtils.java | 47 +++++++++++
.../core/flink/utils/CommandLineUtilsTest.java | 66 +++++++++++++++
11 files changed, 226 insertions(+), 171 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
index 368db2af..63b1a86d 100755
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh
@@ -16,84 +16,24 @@
# limitations under the License.
#
-# copy command line arguments
-
-function usage() {
- echo "Usage: start-seatunnel-sql.sh [options]"
- echo " options:"
- echo " --config, -c FILE_PATH Config file"
- echo " --variable, -i PROP=VALUE Variable substitution, such as -i city=beijing, or -i date=20190318"
- echo " --check, -t Check config"
- echo " --help, -h Show this help message"
-}
-
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then
- usage
- exit 0
-fi
-
-is_exist() {
- if [ -z $1 ]; then
- usage
- exit -1
- fi
-}
-
-PARAMS=""
-while (( "$#" )); do
- case "$1" in
- -c|--config)
- CONFIG_FILE=$2
- is_exist ${CONFIG_FILE}
- shift 2
- ;;
-
- -i|--variable)
- variable=$2
- is_exist ${variable}
- java_property_value="-D${variable}"
- variables_substitution="${java_property_value} ${variables_substitution}"
- shift 2
- ;;
-
- *) # preserve positional arguments
- PARAMS="$PARAMS $1"
- shift
- ;;
-
- esac
-done
-
-if [ -z ${CONFIG_FILE} ]; then
- echo "Error: The following option is required: [-c | --config]"
- usage
- exit -1
-elif [ ! -f ${CONFIG_FILE} ];then
- echo "Error: Config file ${CONFIG_FILE} does not exists! Please check it."
- exit -1
-fi
-
-# set positional arguments in their proper place
-eval set -- "$PARAMS"
-
-BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-APP_DIR=$(dirname ${BIN_DIR})
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
CONF_DIR=${APP_DIR}/config
-PLUGINS_DIR=${APP_DIR}/lib
-DEFAULT_CONFIG=${CONF_DIR}/application.conf
-CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}
+APP_JAR=${APP_DIR}/lib/seatunnel-core-flink-sql.jar
-assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink-sql*.jar)
-
-source ${CONF_DIR}/seatunnel-env.sh
-
-string_trim() {
- echo $1 | awk '{$1=$1;print}'
-}
-
-export JVM_ARGS=$(string_trim "${variables_substitution}")
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+ . "${CONF_DIR}/seatunnel-env.sh"
+fi
-exec ${FLINK_HOME}/bin/flink run \
- ${PARAMS} \
- -c org.apache.seatunnel.core.sql.SeatunnelSql \
- ${assemblyJarName} --config ${CONFIG_FILE}
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.sql.FlinkSqlStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+ # print usage
+ echo ${CMD}
+ exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+ echo "Execute SeaTunnel Flink SQL Job: ${CMD}"
+ eval ${CMD}
+else
+ echo ${CMD}
+ exit ${EXIT_CODE}
+fi
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
new file mode 100644
index 00000000..1e5f52a4
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql;
+
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+
+import java.util.List;
+
+public class FlinkSqlStarter implements Starter {
+
+ private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
+ private static final String CLASS_NAME = SeatunnelSql.class.getName();
+
+ private final FlinkCommandArgs flinkCommandArgs;
+ /**
+ * SeaTunnel flink sql job jar.
+ */
+ private final String appJar;
+
+ FlinkSqlStarter(String[] args) {
+ this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ // set the deployment mode, used to get the job jar path.
+ Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+ this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
+ }
+
+ @Override
+ public List<String> buildCommands() {
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static void main(String[] args) {
+ FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
+ System.out.println(String.join(" ", flinkSqlStarter.buildCommands()));
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 042970c9..47a83268 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.sql;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.seatunnel.core.sql.job.Executor;
import org.apache.seatunnel.core.sql.job.JobInfo;
@@ -36,7 +37,7 @@ public class SeatunnelSql {
}
private static JobInfo parseJob(String[] args) throws IOException {
- FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index 6a00ac5b..03891ab4 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.sql;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.seatunnel.core.sql.job.JobInfo;
@@ -36,7 +37,7 @@ public class SqlVariableSubstitutionTest {
String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
"-t", "-i", "table_name=events", "-i", "table_name2=print_table"};
- FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index 91549617..6fe18a49 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
-import com.beust.jcommander.JCommander;
-
-import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
/**
* The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -33,15 +31,7 @@ import java.util.Objects;
public class FlinkStarter implements Starter {
private static final String APP_NAME = SeatunnelFlink.class.getName();
- private static final int USAGE_EXIT_CODE = 234;
private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
- private static final String RUN_MODE_RUN = "run";
- private static final String RUN_MODE_APPLICATION = "run-application";
-
- /**
- * Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
- */
- private final List<String> flinkParams = new ArrayList<>();
/**
* SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
@@ -54,7 +44,7 @@ public class FlinkStarter implements Starter {
private final String appJar;
FlinkStarter(String[] args) {
- this.flinkCommandArgs = parseArgs(args);
+ this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
@@ -63,54 +53,12 @@ public class FlinkStarter implements Starter {
@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) {
FlinkStarter flinkStarter = new FlinkStarter(args);
- List<String> command = flinkStarter.buildCommands();
- String finalFLinkCommand = String.join(" ", command);
- System.out.println(finalFLinkCommand);
- }
-
- /**
- * Parse seatunnel args.
- *
- * @param args args
- * @return FlinkCommandArgs
- */
- private FlinkCommandArgs parseArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander jCommander = JCommander.newBuilder()
- .programName("start-seatunnel-flink.sh")
- .addObject(flinkCommandArgs)
- .acceptUnknownOptions(true)
- .args(args)
- .build();
- // The args is not belongs to seatunnel, add into flink params
- flinkParams.addAll(jCommander.getUnknownOptions());
- if (flinkCommandArgs.isHelp()) {
- jCommander.usage();
- System.exit(USAGE_EXIT_CODE);
- }
- return flinkCommandArgs;
+ System.out.println(String.join(" ", flinkStarter.buildCommands()));
}
@Override
public List<String> buildCommands() {
- List<String> command = new ArrayList<>();
- command.add("${FLINK_HOME}/bin/flink");
- command.add(flinkCommandArgs.getRunMode().getMode());
- command.addAll(flinkParams);
- command.add("-c");
- command.add(APP_NAME);
- command.add(appJar);
- command.add("--config");
- command.add(flinkCommandArgs.getConfigFile());
- if (flinkCommandArgs.isCheckConfig()) {
- command.add("--check");
- }
- // set System properties
- flinkCommandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
- return command;
+ return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index 020ff1bd..45792d00 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -21,12 +21,13 @@ import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
public class SeatunnelFlink {
public static void main(String[] args) {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseFlinkArgs(args);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index 82783dce..fb892f9c 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
+import java.util.List;
+
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-r", "--run-mode"},
@@ -32,6 +34,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
+ /**
+ * Undefined parameters parsed will be stored here as flink command parameters.
+ */
+ private List<String> flinkParams;
+
@Override
public EngineType getEngineType() {
return EngineType.FLINK;
@@ -50,6 +57,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.runMode = runMode;
}
+ public List<String> getFlinkParams() {
+ return flinkParams;
+ }
+
+ public void setFlinkParams(List<String> flinkParams) {
+ this.flinkParams = flinkParams;
+ }
+
/**
* Used to convert the run mode string to the enum value.
*/
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
similarity index 57%
copy from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
index d5cf42a1..6abe5f75 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkJobType.java
@@ -15,24 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.flink.utils;
+package org.apache.seatunnel.core.flink.config;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+public enum FlinkJobType {
+ JAR("start-seatunnel-flink.sh"),
+ SQL("start-seatunnel-sql.sh"),
+ ;
-import com.beust.jcommander.JCommander;
+ private final String type;
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+ FlinkJobType(String type) {
+ this.type = type;
}
- public static FlinkCommandArgs parseFlinkArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkCommandArgs)
- .build()
- .parse(args);
- return flinkCommandArgs;
+ public String getType() {
+ return this.type;
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
similarity index 55%
copy from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
index d5cf42a1..b57c6293 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
@@ -15,24 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.flink.utils;
+package org.apache.seatunnel.core.flink.constant;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static FlinkCommandArgs parseFlinkArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkCommandArgs)
- .build()
- .parse(args);
- return flinkCommandArgs;
- }
+public class FlinkConstant {
+ public static final int USAGE_EXIT_CODE = 234;
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
index d5cf42a1..028464ef 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
@@ -17,10 +17,17 @@
package org.apache.seatunnel.core.flink.utils;
+import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
+
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
import com.beust.jcommander.JCommander;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
public class CommandLineUtils {
private CommandLineUtils() {
@@ -35,4 +42,44 @@ public class CommandLineUtils {
.parse(args);
return flinkCommandArgs;
}
+
+ public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ JCommander jCommander = JCommander.newBuilder()
+ .programName(jobType.getType())
+ .addObject(flinkCommandArgs)
+ .acceptUnknownOptions(true)
+ .args(args)
+ .build();
+ // The args is not belongs to seatunnel, add into flink params
+ flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
+ if (flinkCommandArgs.isHelp()) {
+ jCommander.usage();
+ System.exit(USAGE_EXIT_CODE);
+ }
+ return flinkCommandArgs;
+
+ }
+
+ public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ command.add(flinkCommandArgs.getRunMode().getMode());
+ command.addAll(flinkCommandArgs.getFlinkParams());
+ command.add("-c");
+ command.add(className);
+ command.add(jarPath);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ if (flinkCommandArgs.isCheckConfig()) {
+ command.add("--check");
+ }
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
+
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
new file mode 100644
index 00000000..66f1ec69
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.utils;
+
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+import org.apache.seatunnel.core.flink.config.FlinkRunMode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CommandLineUtilsTest {
+
+ @Test
+ public void testParseCommandArgs() {
+ String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+ "-r", "run-application", "--unkown", "unkown-command"};
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+ Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+ Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+
+ String[] args1 = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+ "-r", "run-application", "--unkown", "unkown-command"};
+ flinkCommandArgs = CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL);
+ Assert.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+ Assert.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
+ Assert.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
+ }
+
+ @Test
+ public void testBuildFlinkCommand() {
+ String[] args = {"--detached", "-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202",
+ "-r", "run-application", "--unkown", "unkown-command"};
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+ Assert.assertEquals(commands,
+ Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+ "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+ flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
+ Assert.assertEquals(commands,
+ Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
+ "CLASS_NAME", "/path/to/jar", "--config", "app.conf", "--check", "-Dcity=shenyang", "-Ddate=20200202"));
+
+ }
+}