You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/23 02:36:54 UTC
[incubator-seatunnel] branch dev updated: [Improvement][Starter] Use the public CommandLine util class to parse the args (#2470)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new db9dc1281 [Improvement][Starter] Use the public CommandLine util class to parse the args (#2470)
db9dc1281 is described below
commit db9dc128189eebcf7824b06fd687e9e87e194bd9
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Tue Aug 23 10:36:48 2022 +0800
[Improvement][Starter] Use the public CommandLine util class to parse the args (#2470)
* parse args using the public CommandLine utility class.
* code clean
* update FlinkEnvParameterParser in the flink sql module
* Complete test class.
* Complete test class.
---
.../core/base/command/AbstractCommandArgs.java | 13 ++++
.../seatunnel/core/base/constants/Constants.java} | 4 +-
.../core/base/utils/CommandLineUtils.java | 64 ++++++++++++++++
.../core/sql/FlinkEnvParameterParser.java | 4 +-
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 25 ++++++-
.../apache/seatunnel/core/sql/SeatunnelSql.java | 4 +-
.../core/sql/SqlVariableSubstitutionTest.java | 4 +-
.../core/flink/FlinkEnvParameterParser.java | 4 +-
.../apache/seatunnel/core/flink/FlinkStarter.java | 25 ++++++-
.../seatunnel/core/flink/SeatunnelFlink.java | 4 +-
.../core/flink/args/FlinkCommandArgs.java | 15 ----
.../core/flink/utils/CommandLineUtils.java | 87 ----------------------
.../seatunnel/core/flink/FlinkStarterTest.java | 15 +---
.../core/flink/args/FlinkCommandArgsTest.java | 10 +--
.../core/flink/utils/CommandLineUtilsTest.java | 44 ++---------
.../seatunnel/core/spark/SeatunnelSpark.java | 4 +-
.../apache/seatunnel/core/spark/SparkStarter.java | 25 +------
.../core/spark/utils/CommandLineUtils.java | 38 ----------
.../core/spark/args/SparkCommandArgsTest.java | 31 +-------
.../core/spark/utils/CommandLineUtilsTest.java | 16 ++--
.../core/starter/command/AbstractCommandArgs.java | 13 ++++
.../starter/constants/CommonParamConstants.java | 24 ------
.../core/starter/constants/Constants.java} | 4 +-
.../core/starter/utils/CommandLineUtils.java | 64 ++++++++++++++++
.../seatunnel/core/starter/flink/FlinkStarter.java | 25 ++++++-
.../core/starter/flink/SeatunnelFlink.java | 4 +-
.../core/starter/flink/args/FlinkCommandArgs.java | 14 ----
.../core/starter/flink/utils/CommandLineUtils.java | 86 ---------------------
.../core/starter/flink/FlinkStarterTest.java | 13 ----
.../starter/flink/args/FlinkCommandArgsTest.java | 9 +--
.../core/starter/spark/SeatunnelSpark.java | 4 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 25 +------
.../core/starter/spark/utils/CommandLineUtils.java | 38 ----------
.../starter/spark/args/SparkCommandArgsTest.java | 31 +-------
.../starter/spark/utils/CommandLineUtilsTest.java | 17 +++--
35 files changed, 281 insertions(+), 526 deletions(-)
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 9042b2c14..07f7772ef 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -47,6 +47,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
description = "Show the usage message")
private boolean help = false;
+ /**
+ * Undefined parameters parsed will be stored here as engine original command parameters.
+ */
+ private List<String> originalParameters;
+
public String getConfigFile() {
return configFile;
}
@@ -79,6 +84,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
this.help = help;
}
+ public List<String> getOriginalParameters() {
+ return originalParameters;
+ }
+
+ public void setOriginalParameters(List<String> originalParameters) {
+ this.originalParameters = originalParameters;
+ }
+
public EngineType getEngineType() {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
similarity index 91%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
index b57c62930..4ebb0792f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.flink.constant;
+package org.apache.seatunnel.core.base.constants;
-public class FlinkConstant {
+public class Constants {
public static final int USAGE_EXIT_CODE = 234;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java
new file mode 100644
index 000000000..748933b2b
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.base.utils;
+
+import static org.apache.seatunnel.core.base.constants.Constants.USAGE_EXIT_CODE;
+
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+ private CommandLineUtils() {
+ throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+ }
+
+ public static <T extends AbstractCommandArgs> T parse(String[] args, T obj) {
+ return parse(args, obj, null, false);
+ }
+
+ public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
+ JCommander jCommander = JCommander.newBuilder()
+ .programName(programName)
+ .addObject(obj)
+ .acceptUnknownOptions(acceptUnknownOptions)
+ .build();
+ try {
+ jCommander.parse(args);
+ // The args is not belongs to SeaTunnel, add into engine original parameters
+ obj.setOriginalParameters(jCommander.getUnknownOptions());
+ } catch (ParameterException e) {
+ System.err.println(e.getLocalizedMessage());
+ exit(jCommander);
+ }
+
+ if (obj.isHelp()) {
+ exit(jCommander);
+ }
+ return obj;
+ }
+
+ private static void exit(JCommander jCommander) {
+ jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+ jCommander.usage();
+ System.exit(USAGE_EXIT_CODE);
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
index b14838e86..9d41043f6 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.core.sql;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.commons.lang3.StringUtils;
@@ -30,7 +30,7 @@ public class FlinkEnvParameterParser {
@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
List<String> envParameters = getEnvParameters(flinkCommandArgs);
System.out.println(String.join(" ", envParameters));
}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 141a2fd92..78ec4b32c 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.core.sql;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
public class FlinkSqlStarter implements Starter {
@@ -37,7 +39,7 @@ public class FlinkSqlStarter implements Starter {
private final String appJar;
FlinkSqlStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
// set the deployment mode, used to get the job jar path.
Common.setStarter(true);
Common.setDeployMode(flinkCommandArgs.getDeployMode());
@@ -46,7 +48,24 @@ public class FlinkSqlStarter implements Starter {
@Override
public List<String> buildCommands() {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ command.add(flinkCommandArgs.getRunMode().getMode());
+ command.addAll(flinkCommandArgs.getOriginalParameters());
+ command.add("-c");
+ command.add(CLASS_NAME);
+ command.add(appJar);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ if (flinkCommandArgs.isCheckConfig()) {
+ command.add("--check");
+ }
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
}
@SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 47a83268e..a1dead1fe 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.core.sql;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.seatunnel.core.sql.job.Executor;
import org.apache.seatunnel.core.sql.job.JobInfo;
@@ -37,7 +37,7 @@ public class SeatunnelSql {
}
private static JobInfo parseJob(String[] args) throws IOException {
- FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index e17b6e5fb..fafeda292 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.core.sql;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.seatunnel.core.sql.job.JobInfo;
import org.apache.commons.io.FileUtils;
@@ -37,7 +37,7 @@ public class SqlVariableSubstitutionTest {
String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
"-t", "-i", "table_name=events", "-i", "table_name2=print_table"};
- FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
index 7da5437f1..bf4f0ce3c 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
@@ -18,9 +18,9 @@
package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.core.base.config.ConfigParser;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
import org.apache.commons.lang3.StringUtils;
@@ -37,7 +37,7 @@ public class FlinkEnvParameterParser {
@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) throws FileNotFoundException {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
List<String> envParameters = getEnvParameters(flinkCommandArgs);
System.out.println(String.join(" ", envParameters));
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index cd4a2be7a..e7d26ae38 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -44,7 +46,7 @@ public class FlinkStarter implements Starter {
private final String appJar;
FlinkStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
@@ -59,7 +61,24 @@ public class FlinkStarter implements Starter {
@Override
public List<String> buildCommands() {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ command.add(flinkCommandArgs.getRunMode().getMode());
+ command.addAll(flinkCommandArgs.getOriginalParameters());
+ command.add("-c");
+ command.add(APP_NAME);
+ command.add(appJar);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ if (flinkCommandArgs.isCheckConfig()) {
+ command.add("--check");
+ }
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
}
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index f1db4a15a..7afce7171 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -20,15 +20,15 @@ package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
public class SeatunnelFlink {
public static void main(String[] args) throws CommandException {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index f6221c803..90e4ec5a9 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -25,8 +25,6 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
-import java.util.List;
-
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-r", "--run-mode"},
@@ -34,11 +32,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
- /**
- * Undefined parameters parsed will be stored here as flink command parameters.
- */
- private List<String> flinkParams;
-
@Override
public EngineType getEngineType() {
return EngineType.FLINK;
@@ -57,14 +50,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.runMode = runMode;
}
- public List<String> getFlinkParams() {
- return flinkParams;
- }
-
- public void setFlinkParams(List<String> flinkParams) {
- this.flinkParams = flinkParams;
- }
-
/**
* Used to convert the run mode string to the enum value.
*/
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
deleted file mode 100644
index cdbf610ae..000000000
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.flink.utils;
-
-import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
-
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static FlinkCommandArgs parseFlinkArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkCommandArgs)
- .build()
- .parse(args);
- return flinkCommandArgs;
- }
-
- public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander jCommander = JCommander.newBuilder()
- .programName(jobType.getType())
- .addObject(flinkCommandArgs)
- .acceptUnknownOptions(true)
- .args(args)
- .build();
- // The args is not belongs to seatunnel, add into flink params
- flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
- if (flinkCommandArgs.isHelp()) {
- jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
- jCommander.usage();
- System.exit(USAGE_EXIT_CODE);
- }
- return flinkCommandArgs;
-
- }
-
- public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
- List<String> command = new ArrayList<>();
- command.add("${FLINK_HOME}/bin/flink");
- command.add(flinkCommandArgs.getRunMode().getMode());
- command.addAll(flinkCommandArgs.getFlinkParams());
- command.add("-c");
- command.add(className);
- command.add(jarPath);
- command.add("--config");
- command.add(flinkCommandArgs.getConfigFile());
- if (flinkCommandArgs.isCheckConfig()) {
- command.add("--check");
- }
- // set System properties
- flinkCommandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
- return command;
-
- }
-}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 32229ceab..2fe75d4e5 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.core.flink;
-import com.beust.jcommander.ParameterException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -25,7 +24,7 @@ public class FlinkStarterTest {
static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
@Test
- public void buildCommands() throws Exception {
+ public void testBuildCommands() {
String[] args = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
FlinkStarter flinkStarter = new FlinkStarter(args);
String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
@@ -44,17 +43,7 @@ public class FlinkStarterTest {
String[] args3 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
Assertions.assertThrows(IllegalArgumentException.class, () -> new FlinkStarter(args3), "Run mode run123 not supported");
- }
- @Test
- public void buildCommandsMissingConfig() {
- Assertions.assertThrows(ParameterException.class,
- () -> {
- String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
- FlinkStarter flinkStarter = new FlinkStarter(args);
- String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
- // since we cannot get the actual jar path, so we just check the command contains the command
- Assertions.assertTrue(flinkExecuteCommand.contains("--config flink.yarn.conf"));
- }, "The following option is required: [-c | --config]");
}
+
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
index 4b1f9f1b8..de33d7519 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.core.flink.args;
-import com.beust.jcommander.JCommander;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,11 +30,7 @@ public class FlinkCommandArgsTest {
@Test
public void testParseFlinkArgs() {
String[] args = {"-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202"};
- FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkArgs)
- .build()
- .parse(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
Assertions.assertEquals("app.conf", flinkArgs.getConfigFile());
Assertions.assertTrue(flinkArgs.isCheckConfig());
Assertions.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 1f6a8a34b..0712cc000 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.flink.utils;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
import org.apache.seatunnel.core.flink.config.FlinkRunMode;
@@ -24,60 +25,25 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.io.FileNotFoundException;
import java.util.Arrays;
-import java.util.List;
public class CommandLineUtilsTest {
- static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
- static final String SQL_CONF_PATH = ClassLoader.getSystemResource("sql.conf").getPath();
@Test
public void testParseCommandArgs() {
String[] args = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- Assertions.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
+ Assertions.assertEquals(flinkCommandArgs.getOriginalParameters(), Arrays.asList("--detached", "--unkown", "unkown-command"));
Assertions.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
Assertions.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
String[] args1 = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
"-r", "run-application", "--unkown", "unkown-command"};
- flinkCommandArgs = CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL);
- Assertions.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+ flinkCommandArgs = CommandLineUtils.parse(args1, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
+ Assertions.assertEquals(flinkCommandArgs.getOriginalParameters(), Arrays.asList("--detached", "--unkown", "unkown-command"));
Assertions.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
Assertions.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
}
- @Test
- public void testBuildFlinkJarCommand() throws FileNotFoundException {
- String[] args = {"--detached", "-c", APP_CONF_PATH, "-ck", "-i", "city=shenyang", "-i", "date=20200202",
- "-r", "run-application", "--unkown", "unkown-command"};
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
- Assertions.assertEquals(commands,
- Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
-
- flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
- commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
- Assertions.assertEquals(commands,
- Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
-
- String[] args1 = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
- "-r", "run-application", "--unkown", "unkown-command"};
-
- }
-
- @Test
- public void testBuildFlinkSQLCommand() throws FileNotFoundException{
- String[] args = {"--detached", "-c", SQL_CONF_PATH, "-ck", "-i", "city=shenyang", "-i", "date=20200202",
- "-r", "run-application", "--unkown", "unkown-command"};
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
- List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
- Assertions.assertEquals(commands,
- Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
- "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
- }
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index 05a1a1d1e..118724ba1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.spark;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
-import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
public class SeatunnelSpark {
public static void main(String[] args) throws CommandException {
- SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+ SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
Command<SparkCommandArgs> sparkCommand =
new SparkCommandBuilder().buildCommand(sparkArgs);
Seatunnel.run(sparkCommand);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 7eca89b4b..eea811ae9 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.core.base.Starter;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.ConfigParser;
import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.base.utils.CompressionUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -35,8 +36,6 @@ import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
@@ -64,8 +63,6 @@ import java.util.stream.Stream;
*/
public class SparkStarter implements Starter {
- private static final int USAGE_EXIT_CODE = 234;
-
private static final int PLUGIN_LIB_DIR_DEPTH = 3;
/**
@@ -116,7 +113,7 @@ public class SparkStarter implements Starter {
* {@link ClientModeSparkStarter} depending on deploy mode.
*/
static SparkStarter getInstance(String[] args) {
- SparkCommandArgs commandArgs = parseCommandArgs(args);
+ SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
DeployMode deployMode = commandArgs.getDeployMode();
switch (deployMode) {
case CLUSTER:
@@ -128,24 +125,6 @@ public class SparkStarter implements Starter {
}
}
- /**
- * parse commandline args
- */
- private static SparkCommandArgs parseCommandArgs(String[] args) {
- SparkCommandArgs commandArgs = new SparkCommandArgs();
- JCommander commander = JCommander.newBuilder()
- .programName("start-seatunnel-spark.sh")
- .addObject(commandArgs)
- .args(args)
- .build();
- if (commandArgs.isHelp()) {
- commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
- commander.usage();
- System.exit(USAGE_EXIT_CODE);
- }
- return commandArgs;
- }
-
@Override
public List<String> buildCommands() throws IOException {
setSparkConf();
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
deleted file mode 100644
index 811bdddef..000000000
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.spark.utils;
-
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static SparkCommandArgs parseSparkArgs(String[] args) {
- SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkCommandArgs)
- .build()
- .parse(args);
- return sparkCommandArgs;
- }
-}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
index 19214c92b..9179d0c10 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.core.spark.args;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
-import com.beust.jcommander.JCommander;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -30,38 +30,11 @@ public class SparkCommandArgsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkArgs)
- .build()
- .parse(args);
+ SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
Assertions.assertEquals("app.conf", sparkArgs.getConfigFile());
Assertions.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
Assertions.assertEquals("yarn", sparkArgs.getMaster());
Assertions.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
}
- @Test
- public void testHelp() {
- String[] args = {"-h"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander commander = JCommander.newBuilder()
- .addObject(sparkArgs)
- .build();
- commander.parse(args);
- if (sparkArgs.isHelp()) {
- commander.usage();
- }
- }
-
- @Test
- public void testDashDash() {
- String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkArgs)
- .build()
- .parse(args);
- }
-
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
index a013e53fa..483ae8687 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
@@ -17,26 +17,28 @@
package org.apache.seatunnel.core.spark.utils;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import com.beust.jcommander.ParameterException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+
public class CommandLineUtilsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
- SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+ SparkCommandArgs commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs());
Assertions.assertEquals("app.conf", commandLineArgs.getConfigFile());
Assertions.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
- }
- @Test
- public void testParseSparkArgsException() {
- String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
- Assertions.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
+ args = new String[]{"-c", "app.conf", "-e", "cluster", "-m", "local[*]", "--queue", "test"};
+ commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
+
+ Assertions.assertEquals(Arrays.asList("--queue", "test"), commandLineArgs.getOriginalParameters());
}
+
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
index 999eeb66e..20dbcf3ff 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
@@ -46,6 +46,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
description = "Show the usage message")
private boolean help = false;
+ /**
+ * Undefined parameters parsed will be stored here as engine original command parameters.
+ */
+ private List<String> originalParameters;
+
public String getConfigFile() {
return configFile;
}
@@ -78,6 +83,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
this.help = help;
}
+ public List<String> getOriginalParameters() {
+ return originalParameters;
+ }
+
+ public void setOriginalParameters(List<String> originalParameters) {
+ this.originalParameters = originalParameters;
+ }
+
public EngineType getEngineType() {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java
deleted file mode 100644
index 2f12311d2..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.constants;
-
-public class CommonParamConstants {
-
- public static final String RESULT_TABLE_NAME = "result_table_name";
- public static final String SOURCE_TABLE_NAME = "source_table_name";
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
similarity index 90%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java
rename to seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
index 620e5b2e8..8b50b0084 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.flink.constant;
+package org.apache.seatunnel.core.starter.constants;
-public class FlinkConstant {
+public class Constants {
public static final int USAGE_EXIT_CODE = 234;
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
new file mode 100644
index 000000000..c07d69bb6
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.utils;
+
+import static org.apache.seatunnel.core.starter.constants.Constants.USAGE_EXIT_CODE;
+
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+ private CommandLineUtils() {
+ throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+ }
+
+ public static <T extends AbstractCommandArgs> T parse(String[] args, T obj) {
+ return parse(args, obj, null, false);
+ }
+
+ public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
+ JCommander jCommander = JCommander.newBuilder()
+ .programName(programName)
+ .addObject(obj)
+ .acceptUnknownOptions(acceptUnknownOptions)
+ .build();
+ try {
+ jCommander.parse(args);
+ // The args is not belongs to SeaTunnel, add into engine original parameters
+ obj.setOriginalParameters(jCommander.getUnknownOptions());
+ } catch (ParameterException e) {
+ System.err.println(e.getLocalizedMessage());
+ exit(jCommander);
+ }
+
+ if (obj.isHelp()) {
+ exit(jCommander);
+ }
+ return obj;
+ }
+
+ private static void exit(JCommander jCommander) {
+ jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+ jCommander.usage();
+ System.exit(USAGE_EXIT_CODE);
+ }
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 52b207a79..df10fc645 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -44,7 +46,7 @@ public class FlinkStarter implements Starter {
private final String appJar;
FlinkStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
@@ -59,7 +61,24 @@ public class FlinkStarter implements Starter {
@Override
public List<String> buildCommands() {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ command.add(flinkCommandArgs.getRunMode().getMode());
+ command.addAll(flinkCommandArgs.getOriginalParameters());
+ command.add("-c");
+ command.add(APP_NAME);
+ command.add(appJar);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ if (flinkCommandArgs.isCheckConfig()) {
+ command.add("--check");
+ }
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
index 330c2e501..cf01b979d 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
@@ -23,12 +23,12 @@ import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
public class SeatunnelFlink {
public static void main(String[] args) throws CommandException {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 9fd4bdd4e..4c98ff532 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -25,18 +25,12 @@ import org.apache.seatunnel.core.starter.flink.config.FlinkRunMode;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
-import java.util.List;
-
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-r", "--run-mode"},
converter = RunModeConverter.class,
description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
- /**
- * Undefined parameters parsed will be stored here as flink command parameters.
- */
- private List<String> flinkParams;
@Override
public EngineType getEngineType() {
@@ -56,14 +50,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.runMode = runMode;
}
- public List<String> getFlinkParams() {
- return flinkParams;
- }
-
- public void setFlinkParams(List<String> flinkParams) {
- this.flinkParams = flinkParams;
- }
-
/**
* Used to convert the run mode string to the enum value.
*/
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
deleted file mode 100644
index 9b29fdf16..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.constant.FlinkConstant;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static FlinkCommandArgs parseFlinkArgs(String[] args) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkCommandArgs)
- .build()
- .parse(args);
- return flinkCommandArgs;
- }
-
- public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
- FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
- JCommander jCommander = JCommander.newBuilder()
- .programName(jobType.getType())
- .addObject(flinkCommandArgs)
- .acceptUnknownOptions(true)
- .args(args)
- .build();
- // The args is not belongs to seatunnel, add into flink params
- flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
- if (flinkCommandArgs.isHelp()) {
- jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
- jCommander.usage();
- System.exit(FlinkConstant.USAGE_EXIT_CODE);
- }
- return flinkCommandArgs;
-
- }
-
- public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
- List<String> command = new ArrayList<>();
- command.add("${FLINK_HOME}/bin/flink");
- command.add(flinkCommandArgs.getRunMode().getMode());
- command.addAll(flinkCommandArgs.getFlinkParams());
- command.add("-c");
- command.add(className);
- command.add(jarPath);
- command.add("--config");
- command.add(flinkCommandArgs.getConfigFile());
- if (flinkCommandArgs.isCheckConfig()) {
- command.add("--check");
- }
- // set System properties
- flinkCommandArgs.getVariables().stream()
- .filter(Objects::nonNull)
- .map(String::trim)
- .forEach(variable -> command.add("-D" + variable));
- return command;
-
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
index 6dc0ad756..ac791cf26 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
@@ -49,17 +49,4 @@ public class FlinkStarterTest {
Assertions.assertEquals("Run mode run123 not supported", e.getMessage());
}
}
-
- @Test
- public void buildCommandsMissingConfig() {
- try {
- String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
- FlinkStarter flinkStarter = new FlinkStarter(args);
- String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
- // since we cannot get the actual jar path, so we just check the command contains the command
- Assertions.assertTrue(flinkExecuteCommand.contains("--config flink.yarn.conf"));
- } catch (Exception e) {
- Assertions.assertEquals("The following option is required: [-c | --config]", e.getMessage());
- }
- }
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
index 80bbdf6d2..2906b5344 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
@@ -17,7 +17,8 @@
package org.apache.seatunnel.core.starter.flink.args;
-import com.beust.jcommander.JCommander;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -28,11 +29,7 @@ public class FlinkCommandArgsTest {
@Test
public void testParseFlinkArgs() {
String[] args = {"-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202"};
- FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
- JCommander.newBuilder()
- .addObject(flinkArgs)
- .build()
- .parse(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), "seatunnel-flink", true);
Assertions.assertEquals("app.conf", flinkArgs.getConfigFile());
Assertions.assertTrue(flinkArgs.isCheckConfig());
Assertions.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
index 57fdb5d63..338921ad1 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
@@ -22,12 +22,12 @@ import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
-import org.apache.seatunnel.core.starter.spark.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
public class SeatunnelSpark {
public static void main(String[] args) throws CommandException {
- SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+ SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
Command<SparkCommandArgs> sparkCommand =
new SparkCommandBuilder().buildCommand(sparkArgs);
Seatunnel.run(sparkCommand);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 00261a80c..19479d0e3 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.core.starter.config.PluginType;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -35,8 +36,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
@@ -64,8 +63,6 @@ import java.util.stream.Stream;
*/
public class SparkStarter implements Starter {
- private static final int USAGE_EXIT_CODE = 234;
-
private static final int PLUGIN_LIB_DIR_DEPTH = 3;
/**
@@ -116,7 +113,7 @@ public class SparkStarter implements Starter {
* {@link ClientModeSparkStarter} depending on deploy mode.
*/
static SparkStarter getInstance(String[] args) {
- SparkCommandArgs commandArgs = parseCommandArgs(args);
+ SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
DeployMode deployMode = commandArgs.getDeployMode();
switch (deployMode) {
case CLUSTER:
@@ -128,24 +125,6 @@ public class SparkStarter implements Starter {
}
}
- /**
- * parse commandline args
- */
- private static SparkCommandArgs parseCommandArgs(String[] args) {
- SparkCommandArgs commandArgs = new SparkCommandArgs();
- JCommander commander = JCommander.newBuilder()
- .programName("start-seatunnel-spark.sh")
- .addObject(commandArgs)
- .args(args)
- .build();
- if (commandArgs.isHelp()) {
- commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
- commander.usage();
- System.exit(USAGE_EXIT_CODE);
- }
- return commandArgs;
- }
-
@Override
public List<String> buildCommands() throws IOException {
setSparkConf();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
deleted file mode 100644
index 9e25030e6..000000000
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.utils;
-
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static SparkCommandArgs parseSparkArgs(String[] args) {
- SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkCommandArgs)
- .build()
- .parse(args);
- return sparkCommandArgs;
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
index c66201251..dd0cbf2c9 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.core.starter.spark.args;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
-import com.beust.jcommander.JCommander;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -30,38 +30,11 @@ public class SparkCommandArgsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkArgs)
- .build()
- .parse(args);
+ SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
Assertions.assertEquals("app.conf", sparkArgs.getConfigFile());
Assertions.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
Assertions.assertEquals("yarn", sparkArgs.getMaster());
Assertions.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
}
- @Test
- public void testHelp() {
- String[] args = {"-h"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander commander = JCommander.newBuilder()
- .addObject(sparkArgs)
- .build();
- commander.parse(args);
- if (sparkArgs.isHelp()) {
- commander.usage();
- }
- }
-
- @Test
- public void testDashDash() {
- String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- JCommander.newBuilder()
- .addObject(sparkArgs)
- .build()
- .parse(args);
- }
-
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
index 81e177c99..575896698 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
@@ -18,25 +18,28 @@
package org.apache.seatunnel.core.starter.spark.utils;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
-import com.beust.jcommander.ParameterException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+
public class CommandLineUtilsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
- SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+ SparkCommandArgs commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs());
Assertions.assertEquals("app.conf", commandLineArgs.getConfigFile());
Assertions.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
- }
- @Test
- public void testParseSparkArgsException() {
- String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
- Assertions.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
+ args = new String[]{"-c", "app.conf", "-e", "cluster", "-m", "local[*]", "--queue", "test"};
+ commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
+
+ Assertions.assertEquals(Arrays.asList("--queue", "test"), commandLineArgs.getOriginalParameters());
+
}
+
}