You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/14 11:26:13 UTC

[incubator-seatunnel] branch dev updated: [Feature #1421][core] Use different command to execute task (#1422)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2ff6131  [Feature #1421][core] Use different command to execute task (#1422)
2ff6131 is described below

commit 2ff61311d2d04e43243399e1409c5c061b884b03
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Mar 14 19:26:07 2022 +0800

    [Feature #1421][core] Use different command to execute task (#1422)
---
 .../apache/seatunnel/common/config/DeployMode.java |  11 ++
 .../main/java/org/apache/seatunnel/Seatunnel.java  | 158 +++------------------
 .../seatunnel/command/BaseTaskExecuteCommand.java  | 135 ++++++++++++++++++
 .../java/org/apache/seatunnel/command/Command.java |  28 ++--
 .../Engine.java => command/CommandArgs.java}       |  17 +--
 .../Engine.java => command/CommandBuilder.java}    |  17 +--
 .../apache/seatunnel/command/CommandFactory.java   |  45 ++++++
 .../{config => }/command/DeployModeValidator.java  |   2 +-
 .../FlinkCommandArgs.java}                         |  48 ++++---
 .../SparkCommandArgs.java}                         |  57 +++++++-
 .../command/flink/FlinkCommandBuilder.java         |  21 ++-
 .../flink/FlinkConfValidateCommand.java}           |  27 ++--
 .../command/flink/FlinkTaskExecuteCommand.java     |  55 +++++++
 .../command/spark/SparkCommandBuilder.java         |  21 ++-
 .../command/spark/SparkConfValidateCommand.java    |  49 +++++++
 .../command/spark/SparkTaskExecuteCommand.java     |  53 +++++++
 .../org/apache/seatunnel/config/ConfigBuilder.java |   6 +-
 .../{utils/Engine.java => config/EngineType.java}  |  12 +-
 .../seatunnel/{utils => config}/PluginType.java    |   4 +-
 .../seatunnel/config/command/CommandLineArgs.java  |  59 --------
 .../command => utils}/CommandLineUtils.java        |  31 ++--
 .../seatunnel/command/CommandFactoryTest.java      |  64 +++++++++
 .../FlinkCommandArgsTest.java}                     |   8 +-
 .../SparkCommandArgsTest.java}                     |   8 +-
 .../command => utils}/CommandLineUtilsTest.java    |   8 +-
 .../java/org/apache/seatunnel/SeatunnelFlink.java  |  10 +-
 .../java/org/apache/seatunnel/SeatunnelSpark.java  |  10 +-
 .../apache/seatunnel/core/sql/SeatunnelSql.java    |   6 +-
 .../core/sql/SqlVariableSubstitutionTest.java      |   6 +-
 .../seatunnel/example/flink/LocalFlinkExample.java |  13 +-
 .../seatunnel/example/spark/LocalSparkExample.java |  15 +-
 ...{spark.batch.conf.template => spark.batch.conf} |   4 +-
 32 files changed, 639 insertions(+), 369 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
index 44beef8..ce621ac 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
@@ -17,11 +17,18 @@
 
 package org.apache.seatunnel.common.config;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 public enum DeployMode {
     CLIENT("client"),
     CLUSTER("cluster"),
     ;
 
+    private static final Map<String, DeployMode> NAME_MAP = Arrays.stream(DeployMode.values())
+        .collect(HashMap::new, (map, deployMode) -> map.put(deployMode.getName(), deployMode), Map::putAll);
+
     private final String name;
 
     DeployMode(String name) {
@@ -32,4 +39,8 @@ public enum DeployMode {
         return name;
     }
 
+    public static DeployMode from(String name) {
+        return NAME_MAP.get(name);
+    }
+
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
index c11905b..e73187a 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
@@ -17,155 +17,41 @@
 
 package org.apache.seatunnel;
 
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandArgs;
+import org.apache.seatunnel.command.CommandFactory;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.ConfigBuilder;
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-import org.apache.seatunnel.utils.AsciiArtUtils;
-import org.apache.seatunnel.utils.CompressionUtils;
-import org.apache.seatunnel.utils.Engine;
-import org.apache.seatunnel.utils.PluginType;
 
-import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
 public class Seatunnel {
     private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
 
-    public static void run(CommandLineArgs commandLineArgs, Engine engine) throws Exception {
-        if (!Common.setDeployMode(commandLineArgs.getDeployMode())) {
-            throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandLineArgs.getDeployMode()));
-        }
-
-        String configFilePath = getConfigFilePath(commandLineArgs, engine);
-        if (commandLineArgs.isTestConfig()) {
-            new ConfigBuilder(configFilePath, engine).checkConfig();
-            LOGGER.info("config OK !");
-        } else {
-            try {
-                entryPoint(configFilePath, engine);
-            } catch (ConfigRuntimeException e) {
-                showConfigError(e);
-                throw e;
-            } catch (Exception e) {
-                showFatalError(e);
-                throw e;
-            }
-        }
-    }
-
-    private static String getConfigFilePath(CommandLineArgs cmdArgs, Engine engine) {
-        String path = null;
-        switch (engine) {
-            case FLINK:
-                path = cmdArgs.getConfigFile();
-                break;
-            case SPARK:
-                final Optional<String> mode = Common.getDeployMode();
-                if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
-                    path = Paths.get(cmdArgs.getConfigFile()).getFileName().toString();
-                } else {
-                    path = cmdArgs.getConfigFile();
-                }
-                break;
-            default:
-                break;
-        }
-        return path;
-    }
-
-    private static void entryPoint(String configFile, Engine engine) throws Exception {
-
-        ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
-        List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
-        List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
-        List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
-        Execution execution = configBuilder.createExecution();
-        baseCheckConfig(sources, transforms, sinks);
-        prepare(configBuilder.getEnv(), sources, transforms, sinks);
-        showAsciiLogo();
-
-        execution.start(sources, transforms, sinks);
-    }
+    /**
+     * This method is the entrypoint of SeaTunnel.
+     *
+     * @param commandArgs commandArgs
+     * @param <T> commandType
+     */
+    public static <T extends CommandArgs> void run(T commandArgs) {
 
-    @SafeVarargs
-    private static void baseCheckConfig(List<? extends Plugin>... plugins) {
-        for (List<? extends Plugin> pluginList : plugins) {
-            for (Plugin plugin : pluginList) {
-                CheckResult checkResult;
-                try {
-                    checkResult = plugin.checkConfig();
-                } catch (Exception e) {
-                    checkResult = CheckResult.error(e.getMessage());
-                }
-                if (!checkResult.isSuccess()) {
-                    LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
-                    System.exit(-1); // invalid configuration
-                }
-            }
-        }
-        deployModeCheck();
-    }
-
-    private static void deployModeCheck() {
-        final Optional<String> mode = Common.getDeployMode();
-        if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
-
-            LOGGER.info("preparing cluster mode work dir files...");
-            File workDir = new File(".");
-
-            for (File file : Objects.requireNonNull(workDir.listFiles())) {
-                LOGGER.warn("\t list file: " + file.getAbsolutePath());
-            }
-            // decompress plugin dir
-            File compressedFile = new File("plugins.tar.gz");
-
-            try {
-                File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
-                try {
-                    CompressionUtils.unTar(tempFile, workDir);
-                    LOGGER.info("succeeded to decompress plugins.tar.gz");
-                } catch (ArchiveException e) {
-                    LOGGER.error("failed to decompress plugins.tar.gz", e);
-                    System.exit(-1);
-                }
-            } catch (IOException e) {
-                LOGGER.error("failed to decompress plugins.tar.gz", e);
-                System.exit(-1);
-            }
-        }
-    }
-
-    private static void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
-        for (List<? extends Plugin> pluginList : plugins) {
-            pluginList.forEach(plugin -> plugin.prepare(env));
+        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+            throw new IllegalArgumentException(
+                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
         }
 
-    }
-
-    private static void showAsciiLogo() {
-        String printAsciiLogo = System.getenv("SEATUNNEL_PRINT_ASCII_LOGO");
-        if ("true".equalsIgnoreCase(printAsciiLogo)) {
-            AsciiArtUtils.printAsciiArt(Constants.LOGO);
+        try {
+            Command<T> command = CommandFactory.createCommand(commandArgs);
+            command.execute(commandArgs);
+        } catch (ConfigRuntimeException e) {
+            showConfigError(e);
+            throw e;
+        } catch (Exception e) {
+            showFatalError(e);
+            throw e;
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
new file mode 100644
index 0000000..ff00e4b
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.env.RuntimeEnv;
+import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.utils.AsciiArtUtils;
+import org.apache.seatunnel.utils.CompressionUtils;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Base task execute command. More details see:
+ * <ul>
+ *     <li>{@link org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand}</li>
+ *     <li>{@link org.apache.seatunnel.command.spark.SparkTaskExecuteCommand}</li>
+ * </ul>
+ *
+ * @param <T> command args.
+ */
+public abstract class BaseTaskExecuteCommand<T extends CommandArgs> implements Command<T> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
+
+    /**
+     * Check the plugin config.
+     *
+     * @param plugins plugin list.
+     */
+    protected void baseCheckConfig(List<? extends Plugin>... plugins) {
+        pluginCheck(plugins);
+        deployModeCheck();
+    }
+
+    /**
+     * Execute prepare method defined in {@link Plugin}.
+     *
+     * @param env runtimeEnv
+     * @param plugins plugin list
+     */
+    protected void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
+        for (List<? extends Plugin> pluginList : plugins) {
+            pluginList.forEach(plugin -> plugin.prepare(env));
+        }
+    }
+
+    /**
+     * Print the logo.
+     */
+    protected void showAsciiLogo() {
+        String printAsciiLogo = System.getenv("SEATUNNEL_PRINT_ASCII_LOGO");
+        if ("true".equalsIgnoreCase(printAsciiLogo)) {
+            AsciiArtUtils.printAsciiArt(Constants.LOGO);
+        }
+    }
+
+    /**
+     * Execute the checkConfig method defined in {@link Plugin}.
+     *
+     * @param plugins plugin list
+     */
+    private void pluginCheck(List<? extends Plugin>... plugins) {
+        for (List<? extends Plugin> pluginList : plugins) {
+            for (Plugin plugin : pluginList) {
+                CheckResult checkResult;
+                try {
+                    checkResult = plugin.checkConfig();
+                } catch (Exception e) {
+                    checkResult = CheckResult.error(e.getMessage());
+                }
+                if (!checkResult.isSuccess()) {
+                    LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
+                    System.exit(-1); // invalid configuration
+                }
+            }
+        }
+    }
+
+    private void deployModeCheck() {
+        final Optional<String> mode = Common.getDeployMode();
+        if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
+
+            LOGGER.info("preparing cluster mode work dir files...");
+            File workDir = new File(".");
+
+            for (File file : Objects.requireNonNull(workDir.listFiles())) {
+                LOGGER.warn("\t list file: " + file.getAbsolutePath());
+            }
+            // decompress plugin dir
+            File compressedFile = new File("plugins.tar.gz");
+
+            try {
+                File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
+                try {
+                    CompressionUtils.unTar(tempFile, workDir);
+                    LOGGER.info("succeeded to decompress plugins.tar.gz");
+                } catch (ArchiveException e) {
+                    LOGGER.error("failed to decompress plugins.tar.gz", e);
+                    System.exit(-1);
+                }
+            } catch (IOException e) {
+                LOGGER.error("failed to decompress plugins.tar.gz", e);
+                System.exit(-1);
+            }
+        }
+    }
+
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
similarity index 74%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
index 44beef8..767435c 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command;
 
-public enum DeployMode {
-    CLIENT("client"),
-    CLUSTER("cluster"),
-    ;
-
-    private final String name;
-
-    DeployMode(String name) {
-        this.name = name;
-    }
+/**
+ * Command interface.
+ *
+ * @param <T> args type
+ */
+@FunctionalInterface
+public interface Command<T extends CommandArgs> {
 
-    public String getName() {
-        return name;
-    }
+    /**
+     * Execute command
+     *
+     * @param commandArgs args
+     */
+    void execute(T commandArgs);
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
similarity index 76%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
index 7670fba..7f3516e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
@@ -15,18 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.command;
 
-public enum Engine {
-    SPARK("spark"), FLINK("flink"), NULL("");
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
 
-    private String engine;
+public interface CommandArgs {
 
-    Engine(String engine) {
-        this.engine = engine;
-    }
+    EngineType getEngineType();
+
+    DeployMode getDeployMode();
 
-    public String getEngine() {
-        return engine;
-    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
similarity index 75%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
index 7670fba..f4fc8e5 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
@@ -15,18 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.command;
 
-public enum Engine {
-    SPARK("spark"), FLINK("flink"), NULL("");
-
-    private String engine;
-
-    Engine(String engine) {
-        this.engine = engine;
-    }
-
-    public String getEngine() {
-        return engine;
-    }
+@FunctionalInterface
+public interface CommandBuilder<T extends CommandArgs> {
+    Command<T> buildCommand(T commandArgs);
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
new file mode 100644
index 0000000..0264446
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.command.flink.FlinkCommandBuilder;
+import org.apache.seatunnel.command.spark.SparkCommandBuilder;
+
+public class CommandFactory {
+
+    private CommandFactory() {
+    }
+
+    /**
+     * Create seatunnel command.
+     *
+     * @param commandArgs command args.
+     * @return Special command.s
+     */
+    @SuppressWarnings("unchecked")
+    public static <T extends CommandArgs> Command<T> createCommand(T commandArgs) {
+        switch (commandArgs.getEngineType()) {
+            case FLINK:
+                return (Command<T>) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);
+            case SPARK:
+                return (Command<T>) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);
+            default:
+                throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
similarity index 96%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
index 19422dc..de1c207 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
 
 import org.apache.seatunnel.common.config.Common;
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
similarity index 67%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index c847590..ecc886b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -15,57 +15,61 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
 
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
 
 import com.beust.jcommander.Parameter;
 
 import java.util.List;
 
-public class CommandSparkArgs {
+public class FlinkCommandArgs implements CommandArgs {
 
     @Parameter(names = {"-c", "--config"},
         description = "config file",
         required = true)
     private String configFile = "application.conf";
 
-    @Parameter(names = {"-e", "--deploy-mode"},
-        description = "spark deploy mode",
-        required = true,
-        validateWith = org.apache.seatunnel.config.command.DeployModeValidator.class)
-    private String deployMode = DeployMode.CLIENT.getName();
-
-    @Parameter(names = {"-m", "--master"},
-        description = "spark master",
-        required = true)
-    private String master = null;
-
     @Parameter(names = {"-i", "--variable"},
         description = "variable substitution, such as -i city=beijing, or -i date=20190318")
     private List<String> variables = null;
 
     @Parameter(names = {"-t", "--check"},
         description = "check config")
-    private boolean testConfig = false;
+    private boolean checkConfig = false;
 
     public String getConfigFile() {
         return configFile;
     }
 
-    public String getDeployMode() {
-        return deployMode;
+    public boolean isCheckConfig() {
+        return checkConfig;
     }
 
-    public boolean isTestConfig() {
-        return testConfig;
+    public List<String> getVariables() {
+        return variables;
     }
 
-    public String getMaster() {
-        return master;
+    @Override
+    public EngineType getEngineType() {
+        return EngineType.FLINK;
     }
 
-    public List<String> getVariables() {
-        return variables;
+    @Override
+    public DeployMode getDeployMode() {
+        return DeployMode.CLIENT;
+    }
+
+    public void setConfigFile(String configFile) {
+        this.configFile = configFile;
+    }
+
+    public void setVariables(List<String> variables) {
+        this.variables = variables;
+    }
+
+    public void setCheckConfig(boolean checkConfig) {
+        this.checkConfig = checkConfig;
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
similarity index 50%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
index 95cf813..fcfa7dc 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
@@ -15,36 +15,83 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
 
 import com.beust.jcommander.Parameter;
 
 import java.util.List;
 
-public class CommandFlinkArgs {
+public class SparkCommandArgs implements CommandArgs {
 
     @Parameter(names = {"-c", "--config"},
         description = "config file",
         required = true)
     private String configFile = "application.conf";
 
+    @Parameter(names = {"-e", "--deploy-mode"},
+        description = "spark deploy mode",
+        required = true,
+        validateWith = DeployModeValidator.class)
+    private String deployMode = DeployMode.CLIENT.getName();
+
+    @Parameter(names = {"-m", "--master"},
+        description = "spark master",
+        required = true)
+    private String master = null;
+
     @Parameter(names = {"-i", "--variable"},
         description = "variable substitution, such as -i city=beijing, or -i date=20190318")
     private List<String> variables = null;
 
     @Parameter(names = {"-t", "--check"},
         description = "check config")
-    private boolean testConfig = false;
+    private boolean checkConfig = false;
 
     public String getConfigFile() {
         return configFile;
     }
 
-    public boolean isTestConfig() {
-        return testConfig;
+    public DeployMode getDeployMode() {
+        return DeployMode.from(deployMode);
+    }
+
+    public boolean isCheckConfig() {
+        return checkConfig;
+    }
+
+    public String getMaster() {
+        return master;
     }
 
     public List<String> getVariables() {
         return variables;
     }
+
+    @Override
+    public EngineType getEngineType() {
+        return EngineType.SPARK;
+    }
+
+    public void setConfigFile(String configFile) {
+        this.configFile = configFile;
+    }
+
+    public void setDeployMode(String deployMode) {
+        this.deployMode = deployMode;
+    }
+
+    public void setMaster(String master) {
+        this.master = master;
+    }
+
+    public void setVariables(List<String> variables) {
+        this.variables = variables;
+    }
+
+    public void setCheckConfig(boolean checkConfig) {
+        this.checkConfig = checkConfig;
+    }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
similarity index 62%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
index 44beef8..5eedfdc 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
@@ -15,21 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command.flink;
 
-public enum DeployMode {
-    CLIENT("client"),
-    CLUSTER("cluster"),
-    ;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandBuilder;
+import org.apache.seatunnel.command.FlinkCommandArgs;
 
-    private final String name;
+public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
 
-    DeployMode(String name) {
-        this.name = name;
+    @Override
+    public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();
     }
-
-    public String getName() {
-        return name;
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
similarity index 53%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
index 19422dc..02086b2 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
@@ -15,19 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command.flink;
 
-import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
 
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.ParameterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to check the Flink conf is validated.
+ */
+public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
 
-public class DeployModeValidator implements IParameterValidator {
     @Override
-    public void validate(String name, String value)
-        throws ParameterException {
-        if (!Common.isModeAllowed(value)) {
-            throw new ParameterException("deploy-mode: " + value + " is not allowed.");
-        }
+    public void execute(FlinkCommandArgs flinkCommandArgs) {
+        String configPath = flinkCommandArgs.getConfigFile();
+        new ConfigBuilder(configPath, flinkCommandArgs.getEngineType()).checkConfig();
+        LOGGER.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
new file mode 100644
index 0000000..d112515
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.flink;
+
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.config.PluginType;
+import org.apache.seatunnel.env.Execution;
+
+import java.util.List;
+
+/**
+ * Used to execute Flink Job.
+ */
+public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs> {
+
+    @Override
+    public void execute(FlinkCommandArgs flinkCommandArgs) {
+        ConfigBuilder configBuilder = new ConfigBuilder(flinkCommandArgs.getConfigFile(), flinkCommandArgs.getEngineType());
+
+        List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
+        List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
+        List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+        Execution execution = configBuilder.createExecution();
+        baseCheckConfig(sources, transforms, sinks);
+        prepare(configBuilder.getEnv(), sources, transforms, sinks);
+        showAsciiLogo();
+
+        try {
+            execution.start(sources, transforms, sinks);
+        } catch (Exception e) {
+            throw new RuntimeException("Execute Flink task error", e);
+        }
+    }
+
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
similarity index 62%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
index 44beef8..ee3972e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
@@ -15,21 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command.spark;
 
-public enum DeployMode {
-    CLIENT("client"),
-    CLUSTER("cluster"),
-    ;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandBuilder;
+import org.apache.seatunnel.command.SparkCommandArgs;
 
-    private final String name;
+public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
 
-    DeployMode(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
+    @Override
+    public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+        return commandArgs.isCheckConfig() ? new SparkConfValidateCommand() : new SparkTaskExecuteCommand();
     }
 
 }
+
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
new file mode 100644
index 0000000..bfa9f78
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.spark;
+
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.ConfigBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Paths;
+
+/**
+ * Used to validate the spark task conf is validated.
+ */
+public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SparkConfValidateCommand.class);
+
+    @Override
+    public void execute(SparkCommandArgs sparkCommandArgs) {
+        String confPath;
+        if (DeployMode.CLUSTER.equals(sparkCommandArgs.getDeployMode())) {
+            confPath = Paths.get(sparkCommandArgs.getConfigFile()).getFileName().toString();
+        } else {
+            confPath = sparkCommandArgs.getConfigFile();
+        }
+        new ConfigBuilder(confPath, sparkCommandArgs.getEngineType()).checkConfig();
+        LOGGER.info("config OK !");
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
new file mode 100644
index 0000000..f155839
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.spark;
+
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.config.PluginType;
+import org.apache.seatunnel.env.Execution;
+
+import java.util.List;
+
+public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommandArgs> {
+
+    @Override
+    public void execute(SparkCommandArgs sparkCommandArgs) {
+        String confFile = sparkCommandArgs.getConfigFile();
+
+        ConfigBuilder configBuilder = new ConfigBuilder(confFile, sparkCommandArgs.getEngineType());
+        List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
+        List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
+        List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+        Execution execution = configBuilder.createExecution();
+        baseCheckConfig(sources, transforms, sinks);
+        prepare(configBuilder.getEnv(), sources, transforms, sinks);
+        showAsciiLogo();
+
+        try {
+            execution.start(sources, transforms, sinks);
+        } catch (Exception e) {
+            throw new RuntimeException("Execute Spark task error", e);
+        }
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index 7bb8c7a..3a7c295 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -27,8 +27,6 @@ import org.apache.seatunnel.plugin.Plugin;
 import org.apache.seatunnel.spark.SparkEnvironment;
 import org.apache.seatunnel.spark.batch.SparkBatchExecution;
 import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
-import org.apache.seatunnel.utils.Engine;
-import org.apache.seatunnel.utils.PluginType;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -52,14 +50,14 @@ public class ConfigBuilder {
 
     private static final String PLUGIN_NAME_KEY = "plugin_name";
     private final String configFile;
-    private final Engine engine;
+    private final EngineType engine;
     private ConfigPackage configPackage;
     private final Config config;
     private boolean streaming;
     private Config envConfig;
     private final RuntimeEnv env;
 
-    public ConfigBuilder(String configFile, Engine engine) {
+    public ConfigBuilder(String configFile, EngineType engine) {
         this.configFile = configFile;
         this.engine = engine;
         this.config = load();
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
similarity index 83%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
index 7670fba..5b8d239 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
@@ -15,14 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.config;
 
-public enum Engine {
-    SPARK("spark"), FLINK("flink"), NULL("");
+public enum EngineType {
+    SPARK("spark"),
+    FLINK("flink"),
+    ;
 
-    private String engine;
+    private final String engine;
 
-    Engine(String engine) {
+    EngineType(String engine) {
         this.engine = engine;
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
similarity index 93%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
index 9de7975..e8c0705 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.config;
 
 public enum PluginType {
     SOURCE("source"), TRANSFORM("transform"), SINK("sink");
 
-    private String type;
+    private final String type;
 
     PluginType(String type) {
         this.type = type;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java
deleted file mode 100644
index 1fe0ced..0000000
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.config.command;
-
-import org.apache.seatunnel.common.config.DeployMode;
-
-import java.util.List;
-
-public class CommandLineArgs {
-
-    private String deployMode = DeployMode.CLIENT.getName();
-    private final String configFile;
-    private final boolean testConfig;
-    private List<String> variables;
-
-    public CommandLineArgs(String configFile, boolean testConfig, List<String> variables) {
-        this.configFile = configFile;
-        this.testConfig = testConfig;
-        this.variables = variables;
-    }
-
-    public CommandLineArgs(String deployMode, String configFile, boolean testConfig) {
-        this.deployMode = deployMode;
-        this.configFile = configFile;
-        this.testConfig = testConfig;
-    }
-
-    public String getDeployMode() {
-        return deployMode;
-    }
-
-    public String getConfigFile() {
-        return configFile;
-    }
-
-    public boolean isTestConfig() {
-        return testConfig;
-    }
-
-    public List<String> getVariables() {
-        return variables;
-    }
-
-}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
similarity index 59%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
index 412f5d4..540d4ce 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
@@ -15,7 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.utils;
+
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.command.SparkCommandArgs;
 
 import com.beust.jcommander.JCommander;
 
@@ -24,32 +27,22 @@ public final class CommandLineUtils {
     private CommandLineUtils() {
     }
 
-    public static CommandLineArgs parseSparkArgs(String[] args) {
-        CommandSparkArgs commandSparkArgs = new CommandSparkArgs();
+    public static SparkCommandArgs parseSparkArgs(String[] args) {
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
         JCommander.newBuilder()
-            .addObject(commandSparkArgs)
+            .addObject(sparkCommandArgs)
             .build()
             .parse(args);
-
-        return new CommandLineArgs(
-            commandSparkArgs.getDeployMode(),
-            commandSparkArgs.getConfigFile(),
-            commandSparkArgs.isTestConfig()
-        );
+        return sparkCommandArgs;
     }
 
-    public static CommandLineArgs parseFlinkArgs(String[] args) {
-        CommandFlinkArgs commandFlinkArgs = new CommandFlinkArgs();
+    public static FlinkCommandArgs parseFlinkArgs(String[] args) {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         JCommander.newBuilder()
-            .addObject(commandFlinkArgs)
+            .addObject(flinkCommandArgs)
             .build()
             .parse(args);
-
-        return new CommandLineArgs(
-            commandFlinkArgs.getConfigFile(),
-            commandFlinkArgs.isTestConfig(),
-            commandFlinkArgs.getVariables()
-        );
+        return flinkCommandArgs;
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
new file mode 100644
index 0000000..a5bed87
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.command.flink.FlinkConfValidateCommand;
+import org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand;
+import org.apache.seatunnel.command.spark.SparkConfValidateCommand;
+import org.apache.seatunnel.command.spark.SparkTaskExecuteCommand;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CommandFactoryTest {
+
+    @Test
+    public void testCreateSparkConfValidateCommand() {
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+        sparkCommandArgs.setCheckConfig(true);
+        Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
+        Assert.assertEquals(SparkConfValidateCommand.class, sparkCommand.getClass());
+    }
+
+    @Test
+    public void testCreateSparkExecuteTaskCommand() {
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+        sparkCommandArgs.setCheckConfig(false);
+        Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
+        Assert.assertEquals(SparkTaskExecuteCommand.class, sparkCommand.getClass());
+    }
+
+    @Test
+    public void testCreateFlinkConfValidateCommand() {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setCheckConfig(true);
+
+        Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
+        Assert.assertEquals(FlinkConfValidateCommand.class, flinkCommand.getClass());
+    }
+
+    @Test
+    public void testCreateFlinkExecuteTaskCommand() {
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setCheckConfig(false);
+
+        Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
+        Assert.assertEquals(FlinkTaskExecuteCommand.class, flinkCommand.getClass());
+
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
similarity index 87%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
index e446bb4..56e990d 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
 
 import com.beust.jcommander.JCommander;
 import org.junit.Assert;
@@ -23,18 +23,18 @@ import org.junit.Test;
 
 import java.util.Arrays;
 
-public class CommandFlinkArgsTest {
+public class FlinkCommandArgsTest {
 
     @Test
     public void testParseFlinkArgs() {
         String[] args = {"-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202"};
-        CommandFlinkArgs flinkArgs = new CommandFlinkArgs();
+        FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
         JCommander.newBuilder()
             .addObject(flinkArgs)
             .build()
             .parse(args);
         Assert.assertEquals("app.conf", flinkArgs.getConfigFile());
-        Assert.assertTrue(flinkArgs.isTestConfig());
+        Assert.assertTrue(flinkArgs.isCheckConfig());
         Assert.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
similarity index 86%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
index f9af86f..2b2baba 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
 
 import org.apache.seatunnel.common.config.DeployMode;
 
@@ -25,18 +25,18 @@ import org.junit.Test;
 
 import java.util.Arrays;
 
-public class CommandSparkArgsTest {
+public class SparkCommandArgsTest {
 
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
-        CommandSparkArgs sparkArgs = new CommandSparkArgs();
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
         JCommander.newBuilder()
             .addObject(sparkArgs)
             .build()
             .parse(args);
         Assert.assertEquals("app.conf", sparkArgs.getConfigFile());
-        Assert.assertEquals(DeployMode.CLIENT.getName(), sparkArgs.getDeployMode());
+        Assert.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
         Assert.assertEquals("yarn", sparkArgs.getMaster());
         Assert.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
     }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
similarity index 88%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
index 7231b89..f24fd3a 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.utils;
+
+import org.apache.seatunnel.command.SparkCommandArgs;
 
 import com.beust.jcommander.ParameterException;
 import org.junit.Assert;
@@ -26,10 +28,10 @@ public class CommandLineUtilsTest {
     @Test
     public void testParseSparkArgs() {
         String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
-        CommandLineArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+        SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
 
         Assert.assertEquals("app.conf", commandLineArgs.getConfigFile());
-        Assert.assertEquals("cluster", commandLineArgs.getDeployMode());
+        Assert.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
     }
 
     @Test
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
index b3f9b5b..c240a43 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
@@ -17,16 +17,14 @@
 
 package org.apache.seatunnel;
 
-import static org.apache.seatunnel.utils.Engine.FLINK;
-
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
     public static void main(String[] args) throws Exception {
-        CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
-        Seatunnel.run(flinkArgs, FLINK);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        Seatunnel.run(flinkArgs);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
index 68c352f..7d6eb82 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
@@ -17,15 +17,13 @@
 
 package org.apache.seatunnel;
 
-import static org.apache.seatunnel.utils.Engine.SPARK;
-
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.utils.CommandLineUtils;
 
 public class SeatunnelSpark {
 
     public static void main(String[] args) throws Exception {
-        CommandLineArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
-        Seatunnel.run(sparkArgs, SPARK);
+        SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+        Seatunnel.run(sparkArgs);
     }
 }
diff --git a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 9e67f1a..ea8475b 100644
--- a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -17,10 +17,10 @@
 
 package org.apache.seatunnel.core.sql;
 
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.core.sql.job.Executor;
 import org.apache.seatunnel.core.sql.job.JobInfo;
+import org.apache.seatunnel.utils.CommandLineUtils;
 
 import org.apache.commons.io.FileUtils;
 
@@ -36,7 +36,7 @@ public class SeatunnelSql {
     }
 
     private static JobInfo parseJob(String[] args) throws IOException {
-        CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index 30ada8a..63f8ed8 100644
--- a/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.core.sql;
 
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.core.sql.job.JobInfo;
+import org.apache.seatunnel.utils.CommandLineUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
@@ -36,7 +36,7 @@ public class SqlVariableSubstitutionTest {
         String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
             "-t", "-i", "table_name=events"};
 
-        CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+        FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
         String configFilePath = flinkArgs.getConfigFile();
         String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
         JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index 25316e4..e42c7e0 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -17,19 +17,20 @@
 
 package org.apache.seatunnel.example.flink;
 
-import static org.apache.seatunnel.utils.Engine.FLINK;
-
 import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.config.command.CommandLineArgs;
+import org.apache.seatunnel.command.FlinkCommandArgs;
 
 public class LocalFlinkExample {
 
     public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/";
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         String configFile = getTestConfigFile("fake_to_console.conf");
-        CommandLineArgs flinkArgs = new CommandLineArgs(configFile, false, null);
-        Seatunnel.run(flinkArgs, FLINK);
+        FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+        flinkCommandArgs.setConfigFile(configFile);
+        flinkCommandArgs.setCheckConfig(false);
+        flinkCommandArgs.setVariables(null);
+        Seatunnel.run(flinkCommandArgs);
     }
 
     public static String getTestConfigFile(String configFile) {
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index d4b1e5e..5fbf353 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -17,19 +17,20 @@
 
 package org.apache.seatunnel.example.spark;
 
-import static org.apache.seatunnel.utils.Engine.SPARK;
-
 import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.config.command.CommandLineArgs;
+import org.apache.seatunnel.command.SparkCommandArgs;
 
 public class LocalSparkExample {
 
     public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/";
 
-    public static void main(String[] args) throws Exception {
-        String configFile = getTestConfigFile("spark.batch.conf.template");
-        CommandLineArgs sparkArgs = new CommandLineArgs(configFile, false, null);
-        Seatunnel.run(sparkArgs, SPARK);
+    public static void main(String[] args) {
+        String configFile = getTestConfigFile("spark.batch.conf");
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
+        sparkArgs.setConfigFile(configFile);
+        sparkArgs.setCheckConfig(false);
+        sparkArgs.setVariables(null);
+        Seatunnel.run(sparkArgs);
     }
 
     public static String getTestConfigFile(String configFile) {
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
similarity index 95%
rename from seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template
rename to seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
index c703d09..b7c6aa4 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
@@ -49,12 +49,12 @@ source {
 transform {
   # split data by specific delimiter
 
-  # you can also you other filter plugins, such as sql
+  # you can also use other transform plugins, such as sql
   # sql {
   #   sql = "select * from accesslog where request_time > 1000"
   # }
 
-  # If you would like to get more information about how to configure seatunnel and see full list of filter plugins,
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
   # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
 }