You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/23 02:36:54 UTC

[incubator-seatunnel] branch dev updated: [Improvement][Starter] Use the public CommandLine util class to parse the args (#2470)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new db9dc1281 [Improvement][Starter] Use the public CommandLine util class to parse the args  (#2470)
db9dc1281 is described below

commit db9dc128189eebcf7824b06fd687e9e87e194bd9
Author: Kerwin <37...@users.noreply.github.com>
AuthorDate: Tue Aug 23 10:36:48 2022 +0800

    [Improvement][Starter] Use the public CommandLine util class to parse the args  (#2470)
    
    * parse args using the public CommandLine utility class.
    
    * code clean
    
    * update FlinkEnvParameterParser in the flink sql module
    
    * Complete test class.
    
    * Complete test class.
---
 .../core/base/command/AbstractCommandArgs.java     | 13 ++++
 .../seatunnel/core/base/constants/Constants.java}  |  4 +-
 .../core/base/utils/CommandLineUtils.java          | 64 ++++++++++++++++
 .../core/sql/FlinkEnvParameterParser.java          |  4 +-
 .../apache/seatunnel/core/sql/FlinkSqlStarter.java | 25 ++++++-
 .../apache/seatunnel/core/sql/SeatunnelSql.java    |  4 +-
 .../core/sql/SqlVariableSubstitutionTest.java      |  4 +-
 .../core/flink/FlinkEnvParameterParser.java        |  4 +-
 .../apache/seatunnel/core/flink/FlinkStarter.java  | 25 ++++++-
 .../seatunnel/core/flink/SeatunnelFlink.java       |  4 +-
 .../core/flink/args/FlinkCommandArgs.java          | 15 ----
 .../core/flink/utils/CommandLineUtils.java         | 87 ----------------------
 .../seatunnel/core/flink/FlinkStarterTest.java     | 15 +---
 .../core/flink/args/FlinkCommandArgsTest.java      | 10 +--
 .../core/flink/utils/CommandLineUtilsTest.java     | 44 ++---------
 .../seatunnel/core/spark/SeatunnelSpark.java       |  4 +-
 .../apache/seatunnel/core/spark/SparkStarter.java  | 25 +------
 .../core/spark/utils/CommandLineUtils.java         | 38 ----------
 .../core/spark/args/SparkCommandArgsTest.java      | 31 +-------
 .../core/spark/utils/CommandLineUtilsTest.java     | 16 ++--
 .../core/starter/command/AbstractCommandArgs.java  | 13 ++++
 .../starter/constants/CommonParamConstants.java    | 24 ------
 .../core/starter/constants/Constants.java}         |  4 +-
 .../core/starter/utils/CommandLineUtils.java       | 64 ++++++++++++++++
 .../seatunnel/core/starter/flink/FlinkStarter.java | 25 ++++++-
 .../core/starter/flink/SeatunnelFlink.java         |  4 +-
 .../core/starter/flink/args/FlinkCommandArgs.java  | 14 ----
 .../core/starter/flink/utils/CommandLineUtils.java | 86 ---------------------
 .../core/starter/flink/FlinkStarterTest.java       | 13 ----
 .../starter/flink/args/FlinkCommandArgsTest.java   |  9 +--
 .../core/starter/spark/SeatunnelSpark.java         |  4 +-
 .../seatunnel/core/starter/spark/SparkStarter.java | 25 +------
 .../core/starter/spark/utils/CommandLineUtils.java | 38 ----------
 .../starter/spark/args/SparkCommandArgsTest.java   | 31 +-------
 .../starter/spark/utils/CommandLineUtilsTest.java  | 17 +++--
 35 files changed, 281 insertions(+), 526 deletions(-)

diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 9042b2c14..07f7772ef 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -47,6 +47,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
             description = "Show the usage message")
     private boolean help = false;
 
+    /**
+     * Undefined parameters parsed will be stored here as engine original command parameters.
+     */
+    private List<String> originalParameters;
+
     public String getConfigFile() {
         return configFile;
     }
@@ -79,6 +84,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         this.help = help;
     }
 
+    public List<String> getOriginalParameters() {
+        return originalParameters;
+    }
+
+    public void setOriginalParameters(List<String> originalParameters) {
+        this.originalParameters = originalParameters;
+    }
+
     public EngineType getEngineType() {
         throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
     }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
similarity index 91%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
index b57c62930..4ebb0792f 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/constant/FlinkConstant.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/constants/Constants.java
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.flink.constant;
+package org.apache.seatunnel.core.base.constants;
 
-public class FlinkConstant {
+public class Constants {
     public static final int USAGE_EXIT_CODE = 234;
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java
new file mode 100644
index 000000000..748933b2b
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CommandLineUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.base.utils;
+
+import static org.apache.seatunnel.core.base.constants.Constants.USAGE_EXIT_CODE;
+
+import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+    }
+
+    public static <T extends AbstractCommandArgs> T parse(String[] args, T obj) {
+        return parse(args, obj, null, false);
+    }
+
+    public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
+        JCommander jCommander = JCommander.newBuilder()
+                .programName(programName)
+                .addObject(obj)
+                .acceptUnknownOptions(acceptUnknownOptions)
+                .build();
+        try {
+            jCommander.parse(args);
+            // The args is not belongs to SeaTunnel, add into engine original parameters
+            obj.setOriginalParameters(jCommander.getUnknownOptions());
+        } catch (ParameterException e) {
+            System.err.println(e.getLocalizedMessage());
+            exit(jCommander);
+        }
+
+        if (obj.isHelp()) {
+            exit(jCommander);
+        }
+        return obj;
+    }
+
+    private static void exit(JCommander jCommander) {
+        jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+        jCommander.usage();
+        System.exit(USAGE_EXIT_CODE);
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
index b14838e86..9d41043f6 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkEnvParameterParser.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.core.sql;
 
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -30,7 +30,7 @@ public class FlinkEnvParameterParser {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public static void main(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
         List<String> envParameters = getEnvParameters(flinkCommandArgs);
         System.out.println(String.join(" ", envParameters));
     }
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
index 141a2fd92..78ec4b32c 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.core.sql;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 public class FlinkSqlStarter implements Starter {
 
@@ -37,7 +39,7 @@ public class FlinkSqlStarter implements Starter {
     private final String appJar;
 
     FlinkSqlStarter(String[] args) {
-        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
         // set the deployment mode, used to get the job jar path.
         Common.setStarter(true);
         Common.setDeployMode(flinkCommandArgs.getDeployMode());
@@ -46,7 +48,24 @@ public class FlinkSqlStarter implements Starter {
 
     @Override
     public List<String> buildCommands() {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME, appJar);
+        List<String> command = new ArrayList<>();
+        command.add("${FLINK_HOME}/bin/flink");
+        command.add(flinkCommandArgs.getRunMode().getMode());
+        command.addAll(flinkCommandArgs.getOriginalParameters());
+        command.add("-c");
+        command.add(CLASS_NAME);
+        command.add(appJar);
+        command.add("--config");
+        command.add(flinkCommandArgs.getConfigFile());
+        if (flinkCommandArgs.isCheckConfig()) {
+            command.add("--check");
+        }
+        // set System properties
+        flinkCommandArgs.getVariables().stream()
+                .filter(Objects::nonNull)
+                .map(String::trim)
+                .forEach(variable -> command.add("-D" + variable));
+        return command;
     }
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 47a83268e..a1dead1fe 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.core.sql;
 
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.Executor;
 import org.apache.seatunnel.core.sql.job.JobInfo;
 
@@ -37,7 +37,7 @@ public class SeatunnelSql {
     }
 
     private static JobInfo parseJob(String[] args) throws IOException {
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index e17b6e5fb..fafeda292 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.core.sql;
 
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 import org.apache.seatunnel.core.sql.job.JobInfo;
 
 import org.apache.commons.io.FileUtils;
@@ -37,7 +37,7 @@ public class SqlVariableSubstitutionTest {
         String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
             "-t", "-i", "table_name=events", "-i", "table_name2=print_table"};
 
-        FlinkCommandArgs flinkArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
index 7da5437f1..bf4f0ce3c 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkEnvParameterParser.java
@@ -18,9 +18,9 @@
 package org.apache.seatunnel.core.flink;
 
 import org.apache.seatunnel.core.base.config.ConfigParser;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -37,7 +37,7 @@ public class FlinkEnvParameterParser {
 
     @SuppressWarnings("checkstyle:RegexpSingleline")
     public static void main(String[] args) throws FileNotFoundException {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         List<String> envParameters = getEnvParameters(flinkCommandArgs);
         System.out.println(String.join(" ", envParameters));
     }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
index cd4a2be7a..e7d26ae38 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
@@ -19,11 +19,13 @@ package org.apache.seatunnel.core.flink;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.base.Starter;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -44,7 +46,7 @@ public class FlinkStarter implements Starter {
     private final String appJar;
 
     FlinkStarter(String[] args) {
-        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode());
         Common.setStarter(true);
@@ -59,7 +61,24 @@ public class FlinkStarter implements Starter {
 
     @Override
     public List<String> buildCommands() {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+        List<String> command = new ArrayList<>();
+        command.add("${FLINK_HOME}/bin/flink");
+        command.add(flinkCommandArgs.getRunMode().getMode());
+        command.addAll(flinkCommandArgs.getOriginalParameters());
+        command.add("-c");
+        command.add(APP_NAME);
+        command.add(appJar);
+        command.add("--config");
+        command.add(flinkCommandArgs.getConfigFile());
+        if (flinkCommandArgs.isCheckConfig()) {
+            command.add("--check");
+        }
+        // set System properties
+        flinkCommandArgs.getVariables().stream()
+                .filter(Objects::nonNull)
+                .map(String::trim)
+                .forEach(variable -> command.add("-D" + variable));
+        return command;
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index f1db4a15a..7afce7171 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -20,15 +20,15 @@ package org.apache.seatunnel.core.flink;
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
     public static void main(String[] args) throws CommandException {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
             .buildCommand(flinkCommandArgs);
         Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index f6221c803..90e4ec5a9 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -25,8 +25,6 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
-import java.util.List;
-
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
@@ -34,11 +32,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
             description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
 
-    /**
-     * Undefined parameters parsed will be stored here as flink command parameters.
-     */
-    private List<String> flinkParams;
-
     @Override
     public EngineType getEngineType() {
         return EngineType.FLINK;
@@ -57,14 +50,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.runMode = runMode;
     }
 
-    public List<String> getFlinkParams() {
-        return flinkParams;
-    }
-
-    public void setFlinkParams(List<String> flinkParams) {
-        this.flinkParams = flinkParams;
-    }
-
     /**
      * Used to convert the run mode string to the enum value.
      */
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
deleted file mode 100644
index cdbf610ae..000000000
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.flink.utils;
-
-import static org.apache.seatunnel.core.flink.constant.FlinkConstant.USAGE_EXIT_CODE;
-
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
-    }
-
-    public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander jCommander = JCommander.newBuilder()
-            .programName(jobType.getType())
-            .addObject(flinkCommandArgs)
-            .acceptUnknownOptions(true)
-            .args(args)
-            .build();
-        // The args is not belongs to seatunnel, add into flink params
-        flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
-        if (flinkCommandArgs.isHelp()) {
-            jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
-            jCommander.usage();
-            System.exit(USAGE_EXIT_CODE);
-        }
-        return flinkCommandArgs;
-
-    }
-
-    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
-        List<String> command = new ArrayList<>();
-        command.add("${FLINK_HOME}/bin/flink");
-        command.add(flinkCommandArgs.getRunMode().getMode());
-        command.addAll(flinkCommandArgs.getFlinkParams());
-        command.add("-c");
-        command.add(className);
-        command.add(jarPath);
-        command.add("--config");
-        command.add(flinkCommandArgs.getConfigFile());
-        if (flinkCommandArgs.isCheckConfig()) {
-            command.add("--check");
-        }
-        // set System properties
-        flinkCommandArgs.getVariables().stream()
-          .filter(Objects::nonNull)
-          .map(String::trim)
-          .forEach(variable -> command.add("-D" + variable));
-        return command;
-
-    }
-}
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
index 32229ceab..2fe75d4e5 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/FlinkStarterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.core.flink;
 
-import com.beust.jcommander.ParameterException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -25,7 +24,7 @@ public class FlinkStarterTest {
     static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
 
     @Test
-    public void buildCommands() throws Exception {
+    public void testBuildCommands() {
         String[] args = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
         FlinkStarter flinkStarter = new FlinkStarter(args);
         String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
@@ -44,17 +43,7 @@ public class FlinkStarterTest {
 
         String[] args3 = {"--config", APP_CONF_PATH, "-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2", "--run-mode", "run123"};
         Assertions.assertThrows(IllegalArgumentException.class, () -> new FlinkStarter(args3), "Run mode run123 not supported");
-    }
 
-    @Test
-    public void buildCommandsMissingConfig() {
-        Assertions.assertThrows(ParameterException.class,
-            () -> {
-                String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
-                FlinkStarter flinkStarter = new FlinkStarter(args);
-                String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-                // since we cannot get the actual jar path, so we just check the command contains the command
-                Assertions.assertTrue(flinkExecuteCommand.contains("--config flink.yarn.conf"));
-            }, "The following option is required: [-c | --config]");
     }
+
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
index 4b1f9f1b8..de33d7519 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgsTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.core.flink.args;
 
-import com.beust.jcommander.JCommander;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
+import org.apache.seatunnel.core.flink.config.FlinkJobType;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -28,11 +30,7 @@ public class FlinkCommandArgsTest {
     @Test
     public void testParseFlinkArgs() {
         String[] args = {"-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202"};
-        FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkArgs)
-            .build()
-            .parse(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         Assertions.assertEquals("app.conf", flinkArgs.getConfigFile());
         Assertions.assertTrue(flinkArgs.isCheckConfig());
         Assertions.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
diff --git a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
index 1f6a8a34b..0712cc000 100644
--- a/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-flink/src/test/java/org/apache/seatunnel/core/flink/utils/CommandLineUtilsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.flink.utils;
 
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.flink.config.FlinkRunMode;
@@ -24,60 +25,25 @@ import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.io.FileNotFoundException;
 import java.util.Arrays;
-import java.util.List;
 
 public class CommandLineUtilsTest {
-    static final String APP_CONF_PATH = ClassLoader.getSystemResource("app.conf").getPath();
-    static final String SQL_CONF_PATH = ClassLoader.getSystemResource("sql.conf").getPath();
 
     @Test
     public void testParseCommandArgs() {
         String[] args = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        Assertions.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
+        Assertions.assertEquals(flinkCommandArgs.getOriginalParameters(), Arrays.asList("--detached", "--unkown", "unkown-command"));
         Assertions.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
         Assertions.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
 
         String[] args1 = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
             "-r", "run-application", "--unkown", "unkown-command"};
-        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args1, FlinkJobType.SQL);
-        Assertions.assertEquals(flinkCommandArgs.getFlinkParams(), Arrays.asList("--detached", "--unkown", "unkown-command"));
+        flinkCommandArgs = CommandLineUtils.parse(args1, new FlinkCommandArgs(), FlinkJobType.SQL.getType(), true);
+        Assertions.assertEquals(flinkCommandArgs.getOriginalParameters(), Arrays.asList("--detached", "--unkown", "unkown-command"));
         Assertions.assertEquals(flinkCommandArgs.getRunMode(), FlinkRunMode.APPLICATION_RUN);
         Assertions.assertEquals(flinkCommandArgs.getVariables(), Arrays.asList("city=shenyang", "date=20200202"));
     }
 
-    @Test
-    public void testBuildFlinkJarCommand() throws FileNotFoundException {
-        String[] args = {"--detached", "-c", APP_CONF_PATH, "-ck", "-i", "city=shenyang", "-i", "date=20200202",
-            "-r", "run-application", "--unkown", "unkown-command"};
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
-        Assertions.assertEquals(commands,
-                Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                        "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
-
-        flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
-        commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
-        Assertions.assertEquals(commands,
-            Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                "CLASS_NAME", "/path/to/jar", "--config", APP_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
-
-        String[] args1 = {"--detached", "-c", "app.conf", "-ck", "-i", "city=shenyang", "-i", "date=20200202",
-            "-r", "run-application", "--unkown", "unkown-command"};
-
-    }
-
-    @Test
-    public void testBuildFlinkSQLCommand() throws FileNotFoundException{
-        String[] args = {"--detached", "-c", SQL_CONF_PATH, "-ck", "-i", "city=shenyang", "-i", "date=20200202",
-            "-r", "run-application", "--unkown", "unkown-command"};
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.SQL);
-        List<String> commands = CommandLineUtils.buildFlinkCommand(flinkCommandArgs, "CLASS_NAME", "/path/to/jar");
-        Assertions.assertEquals(commands,
-                Arrays.asList("${FLINK_HOME}/bin/flink", "run-application", "--detached", "--unkown", "unkown-command", "-c",
-                        "CLASS_NAME", "/path/to/jar", "--config", SQL_CONF_PATH, "--check", "-Dcity=shenyang", "-Ddate=20200202"));
-    }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index 05a1a1d1e..118724ba1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.spark;
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
-import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
 
 public class SeatunnelSpark {
 
     public static void main(String[] args) throws CommandException {
-        SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+        SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
         Command<SparkCommandArgs> sparkCommand =
             new SparkCommandBuilder().buildCommand(sparkArgs);
         Seatunnel.run(sparkCommand);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 7eca89b4b..eea811ae9 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.core.base.Starter;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.ConfigParser;
 import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.base.utils.CompressionUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -35,8 +36,6 @@ import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
@@ -64,8 +63,6 @@ import java.util.stream.Stream;
  */
 public class SparkStarter implements Starter {
 
-    private static final int USAGE_EXIT_CODE = 234;
-
     private static final int PLUGIN_LIB_DIR_DEPTH = 3;
 
     /**
@@ -116,7 +113,7 @@ public class SparkStarter implements Starter {
      * {@link ClientModeSparkStarter} depending on deploy mode.
      */
     static SparkStarter getInstance(String[] args) {
-        SparkCommandArgs commandArgs = parseCommandArgs(args);
+        SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
         DeployMode deployMode = commandArgs.getDeployMode();
         switch (deployMode) {
             case CLUSTER:
@@ -128,24 +125,6 @@ public class SparkStarter implements Starter {
         }
     }
 
-    /**
-     * parse commandline args
-     */
-    private static SparkCommandArgs parseCommandArgs(String[] args) {
-        SparkCommandArgs commandArgs = new SparkCommandArgs();
-        JCommander commander = JCommander.newBuilder()
-                .programName("start-seatunnel-spark.sh")
-                .addObject(commandArgs)
-                .args(args)
-                .build();
-        if (commandArgs.isHelp()) {
-            commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
-            commander.usage();
-            System.exit(USAGE_EXIT_CODE);
-        }
-        return commandArgs;
-    }
-
     @Override
     public List<String> buildCommands() throws IOException {
         setSparkConf();
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
deleted file mode 100644
index 811bdddef..000000000
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.spark.utils;
-
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static SparkCommandArgs parseSparkArgs(String[] args) {
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(sparkCommandArgs)
-            .build()
-            .parse(args);
-        return sparkCommandArgs;
-    }
-}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
index 19214c92b..9179d0c10 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/args/SparkCommandArgsTest.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.core.spark.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 
-import com.beust.jcommander.JCommander;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -30,38 +30,11 @@ public class SparkCommandArgsTest {
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(sparkArgs)
-            .build()
-            .parse(args);
+        SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
         Assertions.assertEquals("app.conf", sparkArgs.getConfigFile());
         Assertions.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
         Assertions.assertEquals("yarn", sparkArgs.getMaster());
         Assertions.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
     }
 
-    @Test
-    public void testHelp() {
-        String[] args = {"-h"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander commander = JCommander.newBuilder()
-            .addObject(sparkArgs)
-            .build();
-        commander.parse(args);
-        if (sparkArgs.isHelp()) {
-            commander.usage();
-        }
-    }
-
-    @Test
-    public void testDashDash() {
-        String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(sparkArgs)
-            .build()
-            .parse(args);
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
index a013e53fa..483ae8687 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/utils/CommandLineUtilsTest.java
@@ -17,26 +17,28 @@
 
 package org.apache.seatunnel.core.spark.utils;
 
+import org.apache.seatunnel.core.base.utils.CommandLineUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 
-import com.beust.jcommander.ParameterException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+
 public class CommandLineUtilsTest {
 
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
-        SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+        SparkCommandArgs commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs());
 
         Assertions.assertEquals("app.conf", commandLineArgs.getConfigFile());
         Assertions.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
-    }
 
-    @Test
-    public void testParseSparkArgsException() {
-        String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
-        Assertions.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
+        args = new String[]{"-c", "app.conf", "-e", "cluster", "-m", "local[*]", "--queue", "test"};
+        commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
+
+        Assertions.assertEquals(Arrays.asList("--queue", "test"), commandLineArgs.getOriginalParameters());
     }
+
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
index 999eeb66e..20dbcf3ff 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
@@ -46,6 +46,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
             description = "Show the usage message")
     private boolean help = false;
 
+    /**
+     * Undefined parameters parsed will be stored here as engine original command parameters.
+     */
+    private List<String> originalParameters;
+
     public String getConfigFile() {
         return configFile;
     }
@@ -78,6 +83,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         this.help = help;
     }
 
+    public List<String> getOriginalParameters() {
+        return originalParameters;
+    }
+
+    public void setOriginalParameters(List<String> originalParameters) {
+        this.originalParameters = originalParameters;
+    }
+
     public EngineType getEngineType() {
         throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
     }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java
deleted file mode 100644
index 2f12311d2..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/CommonParamConstants.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.constants;
-
-public class CommonParamConstants {
-
-    public static final String RESULT_TABLE_NAME = "result_table_name";
-    public static final String SOURCE_TABLE_NAME = "source_table_name";
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
similarity index 90%
rename from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java
rename to seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
index 620e5b2e8..8b50b0084 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/constant/FlinkConstant.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/constants/Constants.java
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.constant;
+package org.apache.seatunnel.core.starter.constants;
 
-public class FlinkConstant {
+public class Constants {
     public static final int USAGE_EXIT_CODE = 234;
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
new file mode 100644
index 000000000..c07d69bb6
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CommandLineUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.utils;
+
+import static org.apache.seatunnel.core.starter.constants.Constants.USAGE_EXIT_CODE;
+
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+    }
+
+    public static <T extends AbstractCommandArgs> T parse(String[] args, T obj) {
+        return parse(args, obj, null, false);
+    }
+
+    public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
+        JCommander jCommander = JCommander.newBuilder()
+                .programName(programName)
+                .addObject(obj)
+                .acceptUnknownOptions(acceptUnknownOptions)
+                .build();
+        try {
+            jCommander.parse(args);
+            // The args is not belongs to SeaTunnel, add into engine original parameters
+            obj.setOriginalParameters(jCommander.getUnknownOptions());
+        } catch (ParameterException e) {
+            System.err.println(e.getLocalizedMessage());
+            exit(jCommander);
+        }
+
+        if (obj.isHelp()) {
+            exit(jCommander);
+        }
+        return obj;
+    }
+
+    private static void exit(JCommander jCommander) {
+        jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+        jCommander.usage();
+        System.exit(USAGE_EXIT_CODE);
+    }
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 52b207a79..df10fc645 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -21,9 +21,11 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.Starter;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * The SeaTunnel flink starter. This class is responsible for generate the final flink job execute command.
@@ -44,7 +46,7 @@ public class FlinkStarter implements Starter {
     private final String appJar;
 
     FlinkStarter(String[] args) {
-        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode());
         Common.setStarter(true);
@@ -59,7 +61,24 @@ public class FlinkStarter implements Starter {
 
     @Override
     public List<String> buildCommands() {
-        return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME, appJar);
+        List<String> command = new ArrayList<>();
+        command.add("${FLINK_HOME}/bin/flink");
+        command.add(flinkCommandArgs.getRunMode().getMode());
+        command.addAll(flinkCommandArgs.getOriginalParameters());
+        command.add("-c");
+        command.add(APP_NAME);
+        command.add(appJar);
+        command.add("--config");
+        command.add(flinkCommandArgs.getConfigFile());
+        if (flinkCommandArgs.isCheckConfig()) {
+            command.add("--check");
+        }
+        // set System properties
+        flinkCommandArgs.getVariables().stream()
+                .filter(Objects::nonNull)
+                .map(String::trim)
+                .forEach(variable -> command.add("-D" + variable));
+        return command;
     }
 
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
index 330c2e501..cf01b979d 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
@@ -23,12 +23,12 @@ import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
 import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
     public static void main(String[] args) throws CommandException {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), FlinkJobType.JAR.getType(), true);
         Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
             .buildCommand(flinkCommandArgs);
         Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 9fd4bdd4e..4c98ff532 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -25,18 +25,12 @@ import org.apache.seatunnel.core.starter.flink.config.FlinkRunMode;
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
-import java.util.List;
-
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
         converter = RunModeConverter.class,
         description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
-    /**
-     * Undefined parameters parsed will be stored here as flink command parameters.
-     */
-    private List<String> flinkParams;
 
     @Override
     public EngineType getEngineType() {
@@ -56,14 +50,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.runMode = runMode;
     }
 
-    public List<String> getFlinkParams() {
-        return flinkParams;
-    }
-
-    public void setFlinkParams(List<String> flinkParams) {
-        this.flinkParams = flinkParams;
-    }
-
     /**
      * Used to convert the run mode string to the enum value.
      */
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
deleted file mode 100644
index 9b29fdf16..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.starter.flink.constant.FlinkConstant;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkCommandArgs)
-            .build()
-            .parse(args);
-        return flinkCommandArgs;
-    }
-
-    public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
-        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
-        JCommander jCommander = JCommander.newBuilder()
-            .programName(jobType.getType())
-            .addObject(flinkCommandArgs)
-            .acceptUnknownOptions(true)
-            .args(args)
-            .build();
-        // The args is not belongs to seatunnel, add into flink params
-        flinkCommandArgs.setFlinkParams(jCommander.getUnknownOptions());
-        if (flinkCommandArgs.isHelp()) {
-            jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
-            jCommander.usage();
-            System.exit(FlinkConstant.USAGE_EXIT_CODE);
-        }
-        return flinkCommandArgs;
-
-    }
-
-    public static List<String> buildFlinkCommand(FlinkCommandArgs flinkCommandArgs, String className, String jarPath) {
-        List<String> command = new ArrayList<>();
-        command.add("${FLINK_HOME}/bin/flink");
-        command.add(flinkCommandArgs.getRunMode().getMode());
-        command.addAll(flinkCommandArgs.getFlinkParams());
-        command.add("-c");
-        command.add(className);
-        command.add(jarPath);
-        command.add("--config");
-        command.add(flinkCommandArgs.getConfigFile());
-        if (flinkCommandArgs.isCheckConfig()) {
-            command.add("--check");
-        }
-        // set System properties
-        flinkCommandArgs.getVariables().stream()
-          .filter(Objects::nonNull)
-          .map(String::trim)
-          .forEach(variable -> command.add("-D" + variable));
-        return command;
-
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
index 6dc0ad756..ac791cf26 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/FlinkStarterTest.java
@@ -49,17 +49,4 @@ public class FlinkStarterTest {
             Assertions.assertEquals("Run mode run123 not supported", e.getMessage());
         }
     }
-
-    @Test
-    public void buildCommandsMissingConfig() {
-        try {
-            String[] args = {"-m", "yarn-cluster", "-i", "key1=value1", "-i", "key2=value2"};
-            FlinkStarter flinkStarter = new FlinkStarter(args);
-            String flinkExecuteCommand = String.join(" ", flinkStarter.buildCommands());
-            // since we cannot get the actual jar path, so we just check the command contains the command
-            Assertions.assertTrue(flinkExecuteCommand.contains("--config flink.yarn.conf"));
-        } catch (Exception e) {
-            Assertions.assertEquals("The following option is required: [-c | --config]", e.getMessage());
-        }
-    }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
index 80bbdf6d2..2906b5344 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/test/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgsTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.seatunnel.core.starter.flink.args;
 
-import com.beust.jcommander.JCommander;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -28,11 +29,7 @@ public class FlinkCommandArgsTest {
     @Test
     public void testParseFlinkArgs() {
         String[] args = {"-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202"};
-        FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(flinkArgs)
-            .build()
-            .parse(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), "seatunnel-flink", true);
         Assertions.assertEquals("app.conf", flinkArgs.getConfigFile());
         Assertions.assertTrue(flinkArgs.isCheckConfig());
         Assertions.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
index 57fdb5d63..338921ad1 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
@@ -22,12 +22,12 @@ import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
-import org.apache.seatunnel.core.starter.spark.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
 public class SeatunnelSpark {
 
     public static void main(String[] args) throws CommandException {
-        SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+        SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
         Command<SparkCommandArgs> sparkCommand =
             new SparkCommandBuilder().buildCommand(sparkArgs);
         Seatunnel.run(sparkCommand);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 00261a80c..19479d0e3 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.starter.Starter;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.core.starter.config.PluginType;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -35,8 +36,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.UnixStyleUsageFormatter;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
@@ -64,8 +63,6 @@ import java.util.stream.Stream;
  */
 public class SparkStarter implements Starter {
 
-    private static final int USAGE_EXIT_CODE = 234;
-
     private static final int PLUGIN_LIB_DIR_DEPTH = 3;
 
     /**
@@ -116,7 +113,7 @@ public class SparkStarter implements Starter {
      * {@link ClientModeSparkStarter} depending on deploy mode.
      */
     static SparkStarter getInstance(String[] args) {
-        SparkCommandArgs commandArgs = parseCommandArgs(args);
+        SparkCommandArgs commandArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "start-seatunnel-spark.sh", true);
         DeployMode deployMode = commandArgs.getDeployMode();
         switch (deployMode) {
             case CLUSTER:
@@ -128,24 +125,6 @@ public class SparkStarter implements Starter {
         }
     }
 
-    /**
-     * parse commandline args
-     */
-    private static SparkCommandArgs parseCommandArgs(String[] args) {
-        SparkCommandArgs commandArgs = new SparkCommandArgs();
-        JCommander commander = JCommander.newBuilder()
-                .programName("start-seatunnel-spark.sh")
-                .addObject(commandArgs)
-                .args(args)
-                .build();
-        if (commandArgs.isHelp()) {
-            commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
-            commander.usage();
-            System.exit(USAGE_EXIT_CODE);
-        }
-        return commandArgs;
-    }
-
     @Override
     public List<String> buildCommands() throws IOException {
         setSparkConf();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
deleted file mode 100644
index 9e25030e6..000000000
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.utils;
-
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static SparkCommandArgs parseSparkArgs(String[] args) {
-        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-            .addObject(sparkCommandArgs)
-            .build()
-            .parse(args);
-        return sparkCommandArgs;
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
index c66201251..dd0cbf2c9 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.core.starter.spark.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
-import com.beust.jcommander.JCommander;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -30,38 +30,11 @@ public class SparkCommandArgsTest {
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-                .addObject(sparkArgs)
-                .build()
-                .parse(args);
+        SparkCommandArgs sparkArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
         Assertions.assertEquals("app.conf", sparkArgs.getConfigFile());
         Assertions.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
         Assertions.assertEquals("yarn", sparkArgs.getMaster());
         Assertions.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
     }
 
-    @Test
-    public void testHelp() {
-        String[] args = {"-h"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander commander = JCommander.newBuilder()
-                .addObject(sparkArgs)
-                .build();
-        commander.parse(args);
-        if (sparkArgs.isHelp()) {
-            commander.usage();
-        }
-    }
-
-    @Test
-    public void testDashDash() {
-        String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        JCommander.newBuilder()
-                .addObject(sparkArgs)
-                .build()
-                .parse(args);
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
index 81e177c99..575896698 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
@@ -18,25 +18,28 @@
 package org.apache.seatunnel.core.starter.spark.utils;
 
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
 
-import com.beust.jcommander.ParameterException;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+
 public class CommandLineUtilsTest {
 
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
-        SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+        SparkCommandArgs commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs());
 
         Assertions.assertEquals("app.conf", commandLineArgs.getConfigFile());
         Assertions.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
-    }
 
-    @Test
-    public void testParseSparkArgsException() {
-        String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
-        Assertions.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
+        args = new String[]{"-c", "app.conf", "-e", "cluster", "-m", "local[*]", "--queue", "test"};
+        commandLineArgs = CommandLineUtils.parse(args, new SparkCommandArgs(), "seatunnel-spark", true);
+
+        Assertions.assertEquals(Arrays.asList("--queue", "test"), commandLineArgs.getOriginalParameters());
+
     }
+
 }