You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ya...@apache.org on 2022/04/04 02:16:18 UTC
[incubator-seatunnel] branch dev updated: [Feature][Flink] Rewrite start-seatunnel-flink script with starter (#1636)
This is an automated email from the ASF dual-hosted git repository.
yangxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4bdbbfdb [Feature][Flink] Rewrite start-seatunnel-flink script with starter (#1636)
4bdbbfdb is described below
commit 4bdbbfdb0dd36e8918500509c8eb32f39d51189e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Apr 4 10:16:13 2022 +0800
[Feature][Flink] Rewrite start-seatunnel-flink script with starter (#1636)
* Add flink starter
* [Feature][Flink] Rewrite start-seatunnel-flink script with starter
* Add netative test case
* fix code style
* Add trim for variables
* Change FlinkStarter constructor to default
* split run
---
.../apache/seatunnel/command/FlinkCommandArgs.java | 16 ++-
seatunnel-core/seatunnel-core-flink/pom.xml | 5 +
.../src/main/bin/start-seatunnel-flink.sh | 95 +++--------------
.../java/org/apache/seatunnel/FlinkStarter.java | 114 +++++++++++++++++++++
.../org/apache/seatunnel/FlinkStarterTest.java | 48 +++++++++
.../src/main/bin/start-seatunnel-spark.sh | 2 +
.../java/org/apache/seatunnel/SparkStarter.java | 2 +-
7 files changed, 201 insertions(+), 81 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index ecc886b4..d0c70d1d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.config.EngineType;
import com.beust.jcommander.Parameter;
+import java.util.Collections;
import java.util.List;
public class FlinkCommandArgs implements CommandArgs {
@@ -33,12 +34,17 @@ public class FlinkCommandArgs implements CommandArgs {
@Parameter(names = {"-i", "--variable"},
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
- private List<String> variables = null;
+ private List<String> variables = Collections.emptyList();
@Parameter(names = {"-t", "--check"},
description = "check config")
private boolean checkConfig = false;
+ @Parameter(names = {"-h", "--help"},
+ help = true,
+ description = "Show the usage message")
+ private boolean help = false;
+
public String getConfigFile() {
return configFile;
}
@@ -72,4 +78,12 @@ public class FlinkCommandArgs implements CommandArgs {
public void setCheckConfig(boolean checkConfig) {
this.checkConfig = checkConfig;
}
+
+ public boolean isHelp() {
+ return help;
+ }
+
+ public void setHelp(boolean help) {
+ this.help = help;
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index bbb819a7..4e14d9f5 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -120,6 +120,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
</dependencies>
<build>
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
index b0ab9d0f..95160a6b 100755
--- a/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
+++ b/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh
@@ -16,87 +16,24 @@
# limitations under the License.
#
-# copy command line arguments
-
-function usage() {
- echo "Usage: start-seatunnel-flink.sh [options]"
- echo " options:"
- echo " --config, -c FILE_PATH Config file"
- echo " --variable, -i PROP=VALUE Variable substitution, such as -i city=beijing, or -i date=20190318"
- echo " --check, -t Check config"
- echo " --help, -h Show this help message"
-}
-
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]] || [[ $# -le 1 ]]; then
- usage
- exit 0
-fi
-
-is_exist() {
- if [ -z $1 ]; then
- usage
- exit 1
- fi
-}
-
-PARAMS=""
-while (( "$#" )); do
- case "$1" in
- -c|--config)
- CONFIG_FILE=$2
- is_exist ${CONFIG_FILE}
- shift 2
- ;;
-
- -i|--variable)
- variable=$2
- is_exist ${variable}
- java_property_value="-D${variable}"
- variables_substitution="${java_property_value} ${variables_substitution}"
- shift 2
- ;;
-
- *) # preserve positional arguments
- PARAMS="$PARAMS $1"
- shift
- ;;
-
- esac
-done
-
-if [ -z ${CONFIG_FILE} ]; then
- echo "Error: The following option is required: [-c | --config]"
- usage
- exit -1
-elif [ ! -f ${CONFIG_FILE} ];then
- echo "Error: Config file ${CONFIG_FILE} does not exists! Please check it."
- exit -1
-fi
-
-# set positional arguments in their proper place
-eval set -- "$PARAMS"
-
-BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
-APP_DIR=$(dirname ${BIN_DIR})
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
CONF_DIR=${APP_DIR}/config
-PLUGINS_DIR=${APP_DIR}/lib
-DEFAULT_CONFIG=${CONF_DIR}/application.conf
-CONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG}
-
-assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar)
+APP_JAR=${APP_DIR}/lib/seatunnel-core-flink.jar
if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
- source ${CONF_DIR}/seatunnel-env.sh
+ . "${CONF_DIR}/seatunnel-env.sh"
fi
-string_trim() {
- echo $1 | awk '{$1=$1;print}'
-}
-
-JVM_ARGS=$(string_trim "${variables_substitution}")
-export JVM_ARGS
-
-exec ${FLINK_HOME}/bin/flink run \
- ${PARAMS} \
- -c org.apache.seatunnel.SeatunnelFlink \
- ${assemblyJarName} --config ${CONFIG_FILE}
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.FlinkStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+ # print usage
+ echo ${CMD}
+ exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+ echo "Execute SeaTunnel Flink Job: ${CMD}"
+ eval ${CMD}
+else
+ echo ${CMD}
+ exit ${EXIT_CODE}
+fi
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
new file mode 100644
index 00000000..d62984bb
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/FlinkStarter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel;
+
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.common.config.Common;
+
+import com.beust.jcommander.JCommander;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
+ */
+public class FlinkStarter implements Starter {
+
+ private static final String APP_NAME = SeatunnelFlink.class.getName();
+ private static final int USAGE_EXIT_CODE = 234;
+ private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
+
+ /**
+ * Flink parameters, used by flink job itself. e.g. `-m yarn-cluster`
+ */
+ private final List<String> flinkParams = new ArrayList<>();
+
+ /**
+ * SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
+ */
+ private final FlinkCommandArgs flinkCommandArgs;
+
+ /**
+ * SeaTunnel flink job jar.
+ */
+ private final String appJar;
+
+ FlinkStarter(String[] args) {
+ this.flinkCommandArgs = parseArgs(args);
+ // set the deployment mode, used to get the job jar path.
+ Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
+ this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static void main(String[] args) {
+ FlinkStarter flinkStarter = new FlinkStarter(args);
+ List<String> command = flinkStarter.buildCommands();
+ String finalFLinkCommand = String.join(" ", command);
+ System.out.println(finalFLinkCommand);
+ }
+
+ /**
+ * Parse seatunnel args.
+ *
+ * @param args args
+ * @return FlinkCommandArgs
+ */
+ private FlinkCommandArgs parseArgs(String[] args) {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ JCommander jCommander = JCommander.newBuilder()
+ .programName("start-seatunnel-flink.sh")
+ .addObject(flinkCommandArgs)
+ .acceptUnknownOptions(true)
+ .args(args)
+ .build();
+ // The args is not belongs to seatunnel, add into flink params
+ flinkParams.addAll(jCommander.getUnknownOptions());
+ if (flinkCommandArgs.isHelp()) {
+ jCommander.usage();
+ System.exit(USAGE_EXIT_CODE);
+ }
+ return flinkCommandArgs;
+ }
+
+ @Override
+ public List<String> buildCommands() {
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ command.add("run");
+ command.addAll(flinkParams);
+ command.add("-c");
+ command.add(APP_NAME);
+ command.add(appJar);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ if (flinkCommandArgs.isCheckConfig()) {
+ command.add("--check");
+ }
+
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
new file mode 100644
index 00000000..12d909c9
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/FlinkStarterTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStarterTest {
+
+ @Test
+ public void buildCommands() {
+ String[] args = {"--config", "test.conf", "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
+ FlinkStarter flinkStarter = new FlinkStarter(args);
+ String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
+ // since we cannot get the actual jar path, so we just check the command contains the command
+ Assert.assertTrue(flinkExecuteCommand.contains("--config test.conf"));
+ Assert.assertTrue(flinkExecuteCommand.contains("-m yarn-cluster"));
+ Assert.assertTrue(flinkExecuteCommand.contains("-Dkey1=value1"));
+ }
+
+ @Test
+ public void buildCommandsMissingConfig() {
+ try {
+ String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
+ FlinkStarter flinkStarter = new FlinkStarter(args);
+ String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
+ // since we cannot get the actual jar path, so we just check the command contains the command
+ Assert.assertTrue(flinkExecuteCommand.contains("--config flink.yarn.conf"));
+ } catch (Exception e) {
+ Assert.assertEquals("The following option is required: [-c | --config]", e.getMessage());
+ }
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh b/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
index ca69dda1..b7a1a09d 100755
--- a/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
+++ b/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh
@@ -27,9 +27,11 @@ fi
CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.SparkStarter ${@}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
# print usage
+ echo ${CMD}
exit 0
elif [ ${EXIT_CODE} -eq 0 ]; then
eval ${CMD}
else
+ echo ${CMD}
exit ${EXIT_CODE}
fi
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java
index 2b51a2dd..e235e170 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SparkStarter.java
@@ -125,8 +125,8 @@ public class SparkStarter implements Starter {
JCommander commander = JCommander.newBuilder()
.programName("start-seatunnel-spark.sh")
.addObject(commandArgs)
+ .args(args)
.build();
- commander.parse(args);
if (commandArgs.isHelp()) {
commander.usage();
System.exit(USAGE_EXIT_CODE);