You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/25 11:26:40 UTC

[incubator-seatunnel] branch api-draft updated: [API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 62d3468a [API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)
62d3468a is described below

commit 62d3468aef075d53aede6b9a15bd6f5f82ef2ace
Author: Kirs <ki...@apache.org>
AuthorDate: Wed May 25 19:26:35 2022 +0800

    [API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)
    
    * Add spark-core-starter module, code splitting for old and new engines
    * Separation of old and new engines
---
 seatunnel-core/pom.xml                             |   1 +
 .../core/base/command/AbstractCommandArgs.java     |  37 --
 .../core/spark/command/SparkCommandBuilder.java    |  21 +-
 .../starter/config/AbstractExecutionContext.java   |  17 +-
 .../starter/flink/command/FlinkCommandBuilder.java |   2 +-
 seatunnel-core/seatunnel-spark-starter/pom.xml     | 189 +++++++++
 .../src/main/bin/start-seatunnel-spark.sh          |  45 +++
 .../src/main/docker/Dockerfile                     |  37 ++
 .../core/starter/spark/SeatunnelSpark.java}        |  22 +-
 .../seatunnel/core/starter/spark/SparkStarter.java | 429 +++++++++++++++++++++
 .../core/starter/spark/args/SparkCommandArgs.java  |  62 +++
 .../command/SparkApiConfValidateCommand.java}      |  22 +-
 .../spark/command/SparkApiTaskExecuteCommand.java} |  22 +-
 .../spark/command/SparkCommandBuilder.java}        |  21 +-
 .../spark/config/SparkApiConfigChecker.java}       |   8 +-
 .../starter/spark/config/SparkEnvironment.java}    |  10 +-
 .../execution/AbstractPluginExecuteProcessor.java  |   2 +-
 .../spark/execution/PluginExecuteProcessor.java    |   2 +-
 .../spark/execution/SinkExecuteProcessor.java      |   2 +-
 .../spark/execution/SourceExecuteProcessor.java    |   2 +-
 .../spark/execution/SparkTaskExecution.java}       |  12 +-
 .../spark/execution/TransformExecuteProcessor.java |   2 +-
 .../starter/spark/utils/CommandLineUtils.java}     |  25 +-
 .../core/starter}/spark/SparkStarterTest.java      |   2 +-
 .../starter/spark/args/SparkCommandArgsTest.java   |  67 ++++
 .../starter/spark/utils/CommandLineUtilsTest.java} |  30 +-
 .../src/test/resources/spark_application.conf      |  66 ++++
 .../seatunnel-spark-examples/pom.xml               |   2 +-
 .../seatunnel/example/spark/LocalSparkExample.java |  52 ---
 .../example/spark/SeaTunnelApiExample.java         |  12 +-
 .../flink/FlinkTransformPluginDiscovery.java       |   1 +
 .../spark/SparkTransformPluginDiscovery.java       |   1 +
 32 files changed, 1013 insertions(+), 212 deletions(-)

diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index e6c5cf0e..937fe767 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -37,5 +37,6 @@
         <module>seatunnel-core-flink-sql</module>
         <module>seatunnel-core-starter</module>
         <module>seatunnel-flink-starter</module>
+        <module>seatunnel-spark-starter</module>
     </modules>
 </project>
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 107ec2b1..258b7456 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -19,10 +19,8 @@ package org.apache.seatunnel.core.base.command;
 
 import org.apache.seatunnel.apis.base.command.CommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.config.ApiType;
 import org.apache.seatunnel.core.base.config.EngineType;
 
-import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
 import java.util.Collections;
@@ -39,11 +37,6 @@ public abstract class AbstractCommandArgs implements CommandArgs {
             description = "variable substitution, such as -i city=beijing, or -i date=20190318")
     private List<String> variables = Collections.emptyList();
 
-    @Parameter(names = {"-api", "--api-type"},
-            converter = ApiTypeConverter.class,
-            description = "Api type, engine or seatunnel")
-    private ApiType apiType = ApiType.ENGINE_API;
-
     // todo: use command type enum
     @Parameter(names = {"-t", "--check"},
             description = "check config")
@@ -86,14 +79,6 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         this.help = help;
     }
 
-    public ApiType getApiType() {
-        return apiType;
-    }
-
-    public void setApiType(ApiType apiType) {
-        this.apiType = apiType;
-    }
-
     public EngineType getEngineType() {
         throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
     }
@@ -102,26 +87,4 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
     }
 
-    /**
-     * Used to convert the api type string to the enum value.
-     */
-    private static class ApiTypeConverter implements IStringConverter<ApiType> {
-
-        /**
-         * If the '-api' is not set, then will not go into this convert method.
-         *
-         * @param value input value set by '-api' or '--api-type'
-         * @return api type enum value
-         */
-        @Override
-        public ApiType convert(String value) {
-            for (ApiType apiType : ApiType.values()) {
-                if (apiType.getApiType().equalsIgnoreCase(value)) {
-                    return apiType;
-                }
-            }
-            throw new IllegalArgumentException(String.format("API type %s not supported", value));
-        }
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 39512391..65c9582e 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -30,14 +30,8 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
             throw new IllegalArgumentException(
                     String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
         }
-        switch (commandArgs.getApiType()) {
-            case ENGINE_API:
-                return new SparkApiCommandBuilder().buildCommand(commandArgs);
-            case SEATUNNEL_API:
-                return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
-            default:
-                throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
-        }
+
+        return new SparkApiCommandBuilder().buildCommand(commandArgs);
     }
 
     /**
@@ -55,16 +49,5 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
         }
     }
 
-    /**
-     * Used to generate command for seaTunnel API.
-     */
-    private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
-        @Override
-        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
-            return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
-                    : new SeaTunnelApiTaskExecuteCommand(commandArgs);
-        }
-    }
-
 }
 
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
index 6099594e..9538e545 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
@@ -79,16 +79,13 @@ public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {
 
     @SuppressWarnings("checkstyle:Indentation")
     protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
-        return Arrays.stream(pluginTypes).flatMap(new Function<PluginType, Stream<PluginIdentifier>>() {
-            @Override
-            public Stream<PluginIdentifier> apply(PluginType pluginType) {
-                List<? extends Config> configList = config.getConfigList(pluginType.getType());
-                return configList.stream()
-                    .map(pluginConfig -> PluginIdentifier
-                        .of(engine.getEngine(),
-                            pluginType.getType(),
-                            pluginConfig.getString("plugin_name")));
-            }
+        return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
+            List<? extends Config> configList = config.getConfigList(pluginType.getType());
+            return configList.stream()
+                .map(pluginConfig -> PluginIdentifier
+                    .of(engine.getEngine(),
+                        pluginType.getType(),
+                        pluginConfig.getString("plugin_name")));
         }).collect(Collectors.toList());
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index 24661acf..f6e8e4fb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -26,7 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
 
     @Override
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+        if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
             throw new IllegalArgumentException(
                     String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
         }
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
new file mode 100644
index 00000000..d65b5873
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-core</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-spark-starter</artifactId>
+
+    <properties>
+        <docker.repo>seatunnel-spark</docker.repo>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-core-starter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-spark</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-split</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-sql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-replace</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-uuid</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>${project.name}</finalName>
+    </build>
+
+    <!-- todo <profiles>
+        <profile>
+            <id>docker</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>${exec-maven-plugin.version}</version>
+                        <executions>
+                            <execution>
+                                <id>docker-build</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <environmentVariables>
+                                        <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
+                                    </environmentVariables>
+                                    <executable>docker</executable>
+                                    <workingDirectory>${project.basedir}</workingDirectory>
+                                    <arguments>
+                                        <argument>build</argument>
+                                        <argument>&#45;&#45;no-cache</argument>
+                                        <argument>&#45;&#45;build-arg</argument>
+                                        <argument>SPARK_VERSION=${spark.version}</argument>
+                                        <argument>&#45;&#45;build-arg</argument>
+                                        <argument>HADOOP_VERSION=${hadoop.binary.version}</argument>
+                                        <argument>-t</argument>
+                                        <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
+                                        <argument>-t</argument>
+                                        <argument>${docker.hub}/${docker.repo}:latest</argument>
+                                        <argument>.</argument>
+                                        <argument>&#45;&#45;file=src/main/docker/Dockerfile</argument>
+                                    </arguments>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>docker-push</id>
+                                <phase>deploy</phase>
+                                <goals>
+                                    <goal>exec</goal>
+                                </goals>
+                                <configuration>
+                                    <environmentVariables>
+                                        <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
+                                    </environmentVariables>
+                                    <executable>docker</executable>
+                                    <workingDirectory>${project.basedir}</workingDirectory>
+                                    <arguments>
+                                        <argument>buildx</argument>
+                                        <argument>build</argument>
+                                        <argument>&#45;&#45;no-cache</argument>
+                                        <argument>&#45;&#45;build-arg</argument>
+                                        <argument>SPARK_VERSION=${spark.version}</argument>
+                                        <argument>&#45;&#45;build-arg</argument>
+                                        <argument>HADOOP_VERSION=${hadoop.binary.version}</argument>
+                                        <argument>&#45;&#45;push</argument>
+                                        <argument>-t</argument>
+                                        <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
+                                        <argument>-t</argument>
+                                        <argument>${docker.hub}/${docker.repo}:latest</argument>
+                                        <argument>.</argument>
+                                        <argument>&#45;&#45;file=src/main/docker/Dockerfile</argument>
+                                    </arguments>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>-->
+</project>
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh
new file mode 100755
index 00000000..ca6082bb
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-core-spark.jar
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+    . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+    args="-h"
+else
+    args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.starter.spark.SparkStarter ${args} | tail -n 1) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+    # print usage
+    echo ${CMD}
+    exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+    echo "Execute SeaTunnel Spark Job: ${CMD}"
+    eval ${CMD}
+else
+    echo ${CMD}
+    exit ${EXIT_CODE}
+fi
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile b/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile
new file mode 100644
index 00000000..9a3b996d
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG BASE_IMAGE=adoptopenjdk/openjdk8:jre
+
+FROM $BASE_IMAGE
+
+ARG SPARK_VERSION
+ARG HADOOP_VERSION
+
+RUN mkdir -p /spark ; cd /spark ; \
+    tar_file=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz ; \
+    curl -LsO https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/$tar_file ; \
+    tar -zxf $tar_file --strip 1 -C . ; \
+    rm $tar_file
+
+ENV SPARK_HOME=/spark
+
+WORKDIR /seatunnel
+
+COPY target/seatunnel-core-spark.jar /seatunnel/lib/
+COPY src/main/bin /seatunnel/bin/
+
+ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark.sh" ]
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
similarity index 51%
copy from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
index 3715c14b..57fdb5d6 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
@@ -15,17 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark;
 
-import org.apache.seatunnel.core.base.config.ConfigChecker;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.starter.spark.utils.CommandLineUtils;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+public class SeatunnelSpark {
 
-public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
-
-    @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // todo: implement
+    public static void main(String[] args) throws CommandException {
+        SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+        Command<SparkCommandArgs> sparkCommand =
+            new SparkCommandBuilder().buildCommand(sparkArgs);
+        Seatunnel.run(sparkCommand);
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
new file mode 100644
index 00000000..5ddcb268
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark;
+
+import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.PluginType;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A Starter to generate spark-submit command for SeaTunnel job on spark.
+ */
+public class SparkStarter implements Starter {
+
+    private static final int USAGE_EXIT_CODE = 234;
+
+    private static final int PLUGIN_LIB_DIR_DEPTH = 3;
+
+    /**
+     * original commandline args
+     */
+    protected String[] args;
+
+    /**
+     * args parsed from {@link #args}
+     */
+    protected SparkCommandArgs commandArgs;
+
+    /**
+     * the spark application name
+     */
+    protected String appName;
+
+    /**
+     * jars to include on the spark driver and executor classpaths
+     */
+    protected List<Path> jars = new ArrayList<>();
+
+    /**
+     * files to be placed in the working directory of each spark executor
+     */
+    protected List<Path> files = new ArrayList<>();
+
+    /**
+     * spark configuration properties
+     */
+    protected Map<String, String> sparkConf;
+
+    private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
+        this.args = args;
+        this.commandArgs = commandArgs;
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    public static void main(String[] args) throws IOException {
+        SparkStarter starter = getInstance(args);
+        List<String> command = starter.buildCommands();
+        System.out.println(String.join(" ", command));
+    }
+
+    /**
+     * method to get SparkStarter instance, will return
+     * {@link ClusterModeSparkStarter} or
+     * {@link ClientModeSparkStarter} depending on deploy mode.
+     */
+    static SparkStarter getInstance(String[] args) {
+        SparkCommandArgs commandArgs = parseCommandArgs(args);
+        DeployMode deployMode = commandArgs.getDeployMode();
+        switch (deployMode) {
+            case CLUSTER:
+                return new ClusterModeSparkStarter(args, commandArgs);
+            case CLIENT:
+                return new ClientModeSparkStarter(args, commandArgs);
+            default:
+                throw new IllegalArgumentException("Deploy mode " + deployMode + " not supported");
+        }
+    }
+
+    /**
+     * parse commandline args
+     */
+    private static SparkCommandArgs parseCommandArgs(String[] args) {
+        SparkCommandArgs commandArgs = new SparkCommandArgs();
+        JCommander commander = JCommander.newBuilder()
+                .programName("start-seatunnel-spark.sh")
+                .addObject(commandArgs)
+                .args(args)
+                .build();
+        if (commandArgs.isHelp()) {
+            commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
+            commander.usage();
+            System.exit(USAGE_EXIT_CODE);
+        }
+        return commandArgs;
+    }
+
+    @Override
+    public List<String> buildCommands() throws IOException {
+        setSparkConf();
+        Common.setDeployMode(commandArgs.getDeployMode().getName());
+        this.jars.addAll(getPluginsJarDependencies());
+        this.jars.addAll(listJars(Common.appLibDir()));
+        this.jars.addAll(getConnectorJarDependencies());
+        this.appName = this.sparkConf.getOrDefault("spark.app.name", Constants.LOGO);
+        return buildFinal();
+    }
+
+    /**
+     * parse spark configurations from SeaTunnel config file
+     */
+    private void setSparkConf() throws FileNotFoundException {
+        commandArgs.getVariables()
+                .stream()
+                .filter(Objects::nonNull)
+                .map(variable -> variable.split("=", 2))
+                .filter(pair -> pair.length == 2)
+                .forEach(pair -> System.setProperty(pair[0], pair[1]));
+        this.sparkConf = getSparkConf(commandArgs.getConfigFile());
+        String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
+        String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
+        if (!commandArgs.getVariables().isEmpty()) {
+            String properties = commandArgs.getVariables()
+                    .stream()
+                    .map(v -> "-D" + v)
+                    .collect(Collectors.joining(" "));
+            driverJavaOpts += " " + properties;
+            executorJavaOpts += " " + properties;
+            this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim());
+            this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim());
+        }
+    }
+
+    /**
+     * Get spark configurations from SeaTunnel job config file.
+     */
+    static Map<String, String> getSparkConf(String configFile) throws FileNotFoundException {
+        File file = new File(configFile);
+        if (!file.exists()) {
+            throw new FileNotFoundException("config file '" + file + "' does not exists!");
+        }
+        Config appConfig = ConfigFactory.parseFile(file)
+                .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+        return appConfig.getConfig("env")
+                .entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().unwrapped().toString()));
+    }
+
+    /**
+     * return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'.
+     */
+    private List<Path> getPluginsJarDependencies() throws IOException {
+        Path pluginRootDir = Common.pluginRootDir();
+        if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+            return Collections.emptyList();
+        }
+        try (Stream<Path> stream = Files.walk(pluginRootDir, PLUGIN_LIB_DIR_DEPTH, FOLLOW_LINKS)) {
+            return stream
+                    .filter(it -> pluginRootDir.relativize(it).getNameCount() == PLUGIN_LIB_DIR_DEPTH)
+                    .filter(it -> it.getParent().endsWith("lib"))
+                    .filter(it -> it.getFileName().endsWith("jar"))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * return connector's jars, which located in 'connectors/spark/*'.
+     */
+    private List<Path> getConnectorJarDependencies() {
+        Path pluginRootDir = Common.connectorJarDir("spark");
+        if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+            return Collections.emptyList();
+        }
+        Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
+        List<URL> pluginJars = new ArrayList<>();
+        SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
+        SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
+        pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
+        pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SINK)));
+        return pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
+    }
+
+    /**
+     * list jars in given directory
+     */
+    private List<Path> listJars(Path dir) throws IOException {
+        try (Stream<Path> stream = Files.list(dir)) {
+            return stream
+                    .filter(it -> !Files.isDirectory(it))
+                    .filter(it -> it.getFileName().endsWith("jar"))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * build final spark-submit commands
+     */
+    protected List<String> buildFinal() {
+        List<String> commands = new ArrayList<>();
+        commands.add("${SPARK_HOME}/bin/spark-submit");
+        appendOption(commands, "--class", SeatunnelSpark.class.getName());
+        appendOption(commands, "--name", this.appName);
+        appendOption(commands, "--master", this.commandArgs.getMaster());
+        appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getName());
+        appendJars(commands, this.jars);
+        appendFiles(commands, this.files);
+        appendSparkConf(commands, this.sparkConf);
+        appendAppJar(commands);
+        appendArgs(commands, args);
+        return commands;
+    }
+
+    /**
+     * append option to StringBuilder
+     */
+    protected void appendOption(List<String> commands, String option, String value) {
+        commands.add(option);
+        commands.add("\"" + value.replace("\"", "\\\"") + "\"");
+    }
+
+    /**
+     * append jars option to StringBuilder
+     */
+    protected void appendJars(List<String> commands, List<Path> paths) {
+        appendPaths(commands, "--jars", paths);
+    }
+
+    /**
+     * append files option to StringBuilder
+     */
+    protected void appendFiles(List<String> commands, List<Path> paths) {
+        appendPaths(commands, "--files", paths);
+    }
+
+    /**
+     * append comma-split paths option to StringBuilder
+     */
+    protected void appendPaths(List<String> commands, String option, List<Path> paths) {
+        if (!paths.isEmpty()) {
+            String values = paths.stream()
+                    .map(Path::toString)
+                    .collect(Collectors.joining(","));
+            appendOption(commands, option, values);
+        }
+    }
+
+    /**
+     * append spark configurations to StringBuilder
+     */
+    protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
+        for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            appendOption(commands, "--conf", key + "=" + value);
+        }
+    }
+
+    /**
+     * append original commandline args to StringBuilder
+     */
+    protected void appendArgs(List<String> commands, String[] args) {
+        commands.addAll(Arrays.asList(args));
+    }
+
+    /**
+     * append appJar to StringBuilder
+     */
+    protected void appendAppJar(List<String> commands) {
+        commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
+    }
+
+    @SuppressWarnings("checkstyle:Indentation")
+    private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType... pluginTypes) {
+        return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
+            List<? extends Config> configList = config.getConfigList(pluginType.getType());
+            return configList.stream()
+                    .map(pluginConfig -> PluginIdentifier
+                            .of(EngineType.SPARK.getEngine(),
+                                    pluginType.getType(),
+                                    pluginConfig.getString("plugin_name")));
+        }).collect(Collectors.toList());
+    }
+
+    /**
+     * a Starter for building spark-submit commands with client mode options
+     */
+    private static class ClientModeSparkStarter extends SparkStarter {
+
+        /**
+         * client mode specified spark options
+         */
+        private enum ClientModeSparkConfigs {
+
+            /**
+             * Memory for driver in client mode
+             */
+            DriverMemory("--driver-memory", "spark.driver.memory"),
+
+            /**
+             * Extra Java options to pass to the driver in client mode
+             */
+            DriverJavaOptions("--driver-java-options", "spark.driver.extraJavaOptions"),
+
+            /**
+             * Extra library path entries to pass to the driver in client mode
+             */
+            DriverLibraryPath(" --driver-library-path", "spark.driver.extraLibraryPath"),
+
+            /**
+             * Extra class path entries to pass to the driver in client mode
+             */
+            DriverClassPath("--driver-class-path", "spark.driver.extraClassPath");
+
+            private final String optionName;
+
+            private final String propertyName;
+
+            private static final Map<String, ClientModeSparkConfigs> PROPERTY_NAME_MAP = new HashMap<>();
+
+            static {
+                for (ClientModeSparkConfigs config : values()) {
+                    PROPERTY_NAME_MAP.put(config.propertyName, config);
+                }
+            }
+
+            ClientModeSparkConfigs(String optionName, String propertyName) {
+                this.optionName = optionName;
+                this.propertyName = propertyName;
+            }
+        }
+
+        private ClientModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
+            super(args, commandArgs);
+        }
+
+        @Override
+        protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
+            for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) {
+                String driverJavaOptions = this.sparkConf.get(config.propertyName);
+                if (StringUtils.isNotBlank(driverJavaOptions)) {
+                    appendOption(commands, config.optionName, driverJavaOptions);
+                }
+            }
+            for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+                if (ClientModeSparkConfigs.PROPERTY_NAME_MAP.containsKey(key)) {
+                    continue;
+                }
+                appendOption(commands, "--conf", key + "=" + value);
+            }
+        }
+    }
+
+    /**
+     * a Starter for building spark-submit commands with cluster mode options
+     */
+    private static class ClusterModeSparkStarter extends SparkStarter {
+
+        private ClusterModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
+            super(args, commandArgs);
+        }
+
+        @Override
+        public List<String> buildCommands() throws IOException {
+            Common.setDeployMode(commandArgs.getDeployMode().getName());
+            Path pluginTarball = Common.pluginTarball();
+            if (Files.notExists(pluginTarball)) {
+                CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
+            }
+            this.files.add(pluginTarball);
+            this.files.add(Paths.get(commandArgs.getConfigFile()));
+            return super.buildCommands();
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
new file mode 100644
index 00000000..eb5ef929
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.starter.command.DeployModeConverter;
+import org.apache.seatunnel.core.starter.config.EngineType;
+
+import com.beust.jcommander.Parameter;
+
+public class SparkCommandArgs extends AbstractCommandArgs {
+
+    @Parameter(names = {"-e", "--deploy-mode"},
+        description = "Spark deploy mode",
+        required = true,
+        converter = DeployModeConverter.class)
+    private DeployMode deployMode;
+
+    @Parameter(names = {"-m", "--master"},
+        description = "Spark master",
+        required = true)
+    private String master = null;
+
+    public String getMaster() {
+        return master;
+    }
+
+    @Override
+    public EngineType getEngineType() {
+        return EngineType.SPARK;
+    }
+
+    @Override
+    public DeployMode getDeployMode() {
+        return deployMode;
+    }
+
+    public void setDeployMode(DeployMode deployMode) {
+        this.deployMode = deployMode;
+    }
+
+    public void setMaster(String master) {
+        this.master = master;
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
similarity index 66%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
index 624ee545..7b9d6782 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.command;
+package org.apache.seatunnel.core.starter.spark.command;
 
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-import org.apache.seatunnel.core.base.utils.FileUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.config.SeaTunnelApiConfigChecker;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.config.SparkApiConfigChecker;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,13 +32,13 @@ import java.nio.file.Path;
 /**
  * Use to validate the configuration of the SeaTunnel API.
  */
-public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs> {
+public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiConfValidateCommand.class);
 
     private final SparkCommandArgs sparkCommandArgs;
 
-    public SeaTunnelApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
+    public SparkApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
         this.sparkCommandArgs = sparkCommandArgs;
     }
 
@@ -47,7 +47,7 @@ public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs
         Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
         // todo: validate the config by new api
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
-        new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+        new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
         LOGGER.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
similarity index 69%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index a0e30a7b..ecdc8a92 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.command;
+package org.apache.seatunnel.core.starter.spark.command;
 
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.exception.CommandExecuteException;
-import org.apache.seatunnel.core.base.utils.FileUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.execution.SeaTunnelTaskExecution;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.execution.SparkTaskExecution;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -35,13 +35,13 @@ import java.nio.file.Path;
  * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
  * This command is used to execute the Flink job by SeaTunnel new API.
  */
-public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs> {
+public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiTaskExecuteCommand.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiTaskExecuteCommand.class);
 
     private final SparkCommandArgs sparkCommandArgs;
 
-    public SeaTunnelApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
+    public SparkApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
         this.sparkCommandArgs = sparkCommandArgs;
     }
 
@@ -50,7 +50,7 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs>
         Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
         Config config = new ConfigBuilder(configFile).getConfig();
         try {
-            SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
+            SparkTaskExecution seaTunnelTaskExecution = new SparkTaskExecution(config);
             seaTunnelTaskExecution.execute();
         } catch (Exception e) {
             LOGGER.error("Run SeaTunnel on spark failed.", e);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
similarity index 65%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 24661acf..65d6e729 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -15,32 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.flink.command;
+package org.apache.seatunnel.core.starter.spark.command;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 
-public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
+public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
 
     @Override
-    public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-        if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+    public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+        if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
             throw new IllegalArgumentException(
                     String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
         }
-        return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+        return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
     }
 
     /**
      * Used to generate command for seaTunnel API.
      */
-    private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
+    private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
         @Override
-        public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
-            return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
-                    : new FlinkApiTaskExecuteCommand(commandArgs);
+        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+            return commandArgs.isCheckConfig() ? new SparkApiConfValidateCommand(commandArgs)
+                    : new SparkApiTaskExecuteCommand(commandArgs);
         }
     }
 }
+
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
similarity index 78%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
index 3715c14b..b08a4a75 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark.config;
 
-import org.apache.seatunnel.core.base.config.ConfigChecker;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.ConfigChecker;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
+public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
 
     @Override
     public void checkConfig(Config config) throws ConfigCheckException {
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
similarity index 84%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
index 6f9e0135..f7d5b83d 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark.config;
 
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.CheckResult;
@@ -27,10 +27,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import java.net.URL;
 import java.util.List;
 
-public class SeaTunnelEnvironment implements RuntimeEnv {
+public class SparkEnvironment implements RuntimeEnv {
 
     @Override
-    public SeaTunnelEnvironment setConfig(Config config) {
+    public SparkEnvironment setConfig(Config config) {
         return null;
     }
 
@@ -45,12 +45,12 @@ public class SeaTunnelEnvironment implements RuntimeEnv {
     }
 
     @Override
-    public SeaTunnelEnvironment prepare() {
+    public SparkEnvironment prepare() {
         return null;
     }
 
     @Override
-    public SeaTunnelEnvironment setJobMode(JobMode mode) {
+    public SparkEnvironment setJobMode(JobMode mode) {
         return null;
     }
 
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
similarity index 97%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 2d222eb4..0dad6f68 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
similarity index 95%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
index 48d18488..51fc752c 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 08807f7c..2a5a4597 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.Common;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index c6e42a0a..dd7cec03 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.utils.SerializationUtils;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
similarity index 91%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
index d25d1764..1e2b5aa4 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class SeaTunnelTaskExecution {
+public class SparkTaskExecution {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelTaskExecution.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SparkTaskExecution.class);
 
     private final Config config;
     private final SparkEnvironment sparkEnvironment;
@@ -43,7 +43,7 @@ public class SeaTunnelTaskExecution {
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
-    public SeaTunnelTaskExecution(Config config) {
+    public SparkTaskExecution(Config config) {
         this.config = config;
         this.sparkEnvironment = getSparkEnvironment(config);
         this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index aa6e56c8..deec43b9 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
similarity index 56%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
index 5468eee3..9e25030e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
@@ -15,19 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.core.starter.spark.utils;
 
-public enum ApiType {
-    ENGINE_API("engine"),
-    SEATUNNEL_API("seatunnel"),
-    ;
-    private final String apiType;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 
-    ApiType(String apiType) {
-        this.apiType = apiType;
+import com.beust.jcommander.JCommander;
+
+public class CommandLineUtils {
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
     }
 
-    public String getApiType() {
-        return apiType;
+    public static SparkCommandArgs parseSparkArgs(String[] args) {
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+        JCommander.newBuilder()
+            .addObject(sparkCommandArgs)
+            .build()
+            .parse(args);
+        return sparkCommandArgs;
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
similarity index 96%
copy from seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
copy to seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
index 20d35b92..3b0bef3a 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark;
+package org.apache.seatunnel.core.starter.spark;
 
 import static org.junit.Assert.assertEquals;
 
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
new file mode 100644
index 00000000..c36b1ce4
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+
+import com.beust.jcommander.JCommander;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class SparkCommandArgsTest {
+
+    @Test
+    public void testParseSparkArgs() {
+        String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
+        JCommander.newBuilder()
+                .addObject(sparkArgs)
+                .build()
+                .parse(args);
+        Assert.assertEquals("app.conf", sparkArgs.getConfigFile());
+        Assert.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
+        Assert.assertEquals("yarn", sparkArgs.getMaster());
+        Assert.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
+    }
+
+    @Test
+    public void testHelp() {
+        String[] args = {"-h"};
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
+        JCommander commander = JCommander.newBuilder()
+                .addObject(sparkArgs)
+                .build();
+        commander.parse(args);
+        if (sparkArgs.isHelp()) {
+            commander.usage();
+        }
+    }
+
+    @Test
+    public void testDashDash() {
+        String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
+        SparkCommandArgs sparkArgs = new SparkCommandArgs();
+        JCommander.newBuilder()
+                .addObject(sparkArgs)
+                .build()
+                .parse(args);
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
similarity index 50%
rename from seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
rename to seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
index 20d35b92..61cafeeb 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
@@ -15,26 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.spark;
+package org.apache.seatunnel.core.starter.spark.utils;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 
+import com.beust.jcommander.ParameterException;
+import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
+public class CommandLineUtilsTest {
 
-public class SparkStarterTest {
+    @Test
+    public void testParseSparkArgs() {
+        String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
+        SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+
+        Assert.assertEquals("app.conf", commandLineArgs.getConfigFile());
+        Assert.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
+    }
 
     @Test
-    public void testGetSparkConf() throws URISyntaxException, FileNotFoundException {
-        URI uri = ClassLoader.getSystemResource("spark_application.conf").toURI();
-        String file = new File(uri).toString();
-        Map<String, String> sparkConf = SparkStarter.getSparkConf(file);
-        assertEquals("SeaTunnel", sparkConf.get("spark.app.name"));
-        assertEquals("1", sparkConf.get("spark.executor.cores"));
+    public void testParseSparkArgsException() {
+        String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
+        Assert.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf b/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf
new file mode 100644
index 00000000..bf853a4c
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in seatunnel config
+######
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  "spark.executor.cores" = 1
+  "spark.executor.memory" = "1g"
+  "spark.streaming.batchDuration" = 5
+}
+
+source {
+
+  fakeStream {
+    content = ["Hello World, InterestingLab"]
+  }
+
+}
+
+transform {
+
+  split {
+    fields = ["msg", "name"]
+    delimiter = ","
+  }
+
+  sql1 {
+    sql = "sql1"
+  }
+
+  sql2 {
+    sql = "sql2"
+  }
+
+  sql3 {
+    sql = "sql3"
+  }
+
+  json {
+    sql = "sql3"
+  }
+
+}
+
+sink {
+  Console {}
+  c.Console {}
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 7f586827..1c3b3dfd 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -34,7 +34,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-spark</artifactId>
+            <artifactId>seatunnel-spark-starter</artifactId>
             <version>${project.version}</version>
         </dependency>
 
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
deleted file mode 100644
index aa121a90..00000000
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.example.spark;
-
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Seatunnel;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.exception.CommandException;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
-
-import java.io.FileNotFoundException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-
-public class LocalSparkExample {
-
-    public static void main(String[] args) throws URISyntaxException, FileNotFoundException, CommandException {
-        String configFile = getTestConfigFile("/examples/spark.batch.conf");
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        sparkArgs.setConfigFile(configFile);
-        sparkArgs.setCheckConfig(false);
-        sparkArgs.setVariables(null);
-        sparkArgs.setDeployMode(DeployMode.CLIENT);
-        Command<SparkCommandArgs> sparkCommand = new SparkCommandBuilder().buildCommand(sparkArgs);
-        Seatunnel.run(sparkCommand);
-    }
-
-    public static String getTestConfigFile(String configFile) throws URISyntaxException, FileNotFoundException {
-        URL resource = LocalSparkExample.class.getResource(configFile);
-        if (resource == null) {
-            throw new FileNotFoundException("Could not find config file: " + configFile);
-        }
-        return Paths.get(resource.toURI()).toString();
-    }
-}
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
index 1cdf40f8..970da462 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
@@ -18,11 +18,11 @@
 package org.apache.seatunnel.example.spark;
 
 import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Seatunnel;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.exception.CommandException;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
 
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
@@ -44,7 +44,7 @@ public class SeaTunnelApiExample {
     }
 
     public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
-        URL resource = LocalSparkExample.class.getResource(configFile);
+        URL resource = SeaTunnelApiExample.class.getResource(configFile);
         if (resource == null) {
             throw new FileNotFoundException("Can't find config file: " + configFile);
         }
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
index cda5ab6b..cc77b49a 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -31,6 +31,7 @@ public class FlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseF
         super("flink");
     }
 
+    @Override
     public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
         return new ArrayList<>();
     }
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
index 849fabbd..cc125e26 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
@@ -34,6 +34,7 @@ public class SparkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseS
         super("spark");
     }
 
+    @Override
     public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
         return Collections.emptyList();
     }