You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 09:29:34 UTC

[incubator-seatunnel] branch st-engine updated: [ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new c0ba0691f [ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)
c0ba0691f is described below

commit c0ba0691ff6ba69190a3837e6747fc646e2bd932
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Sep 15 17:29:28 2022 +0800

    [ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)
---
 pom.xml                                            |   7 ++
 seatunnel-apis/seatunnel-api-base/pom.xml          |   4 -
 seatunnel-core/README.md                           |   5 +-
 seatunnel-core/pom.xml                             |   6 +-
 seatunnel-core/seatunnel-core-starter/pom.xml      |  10 +-
 .../starter/config/AbstractExecutionContext.java   |  92 --------------
 .../core/starter/config/EnvironmentFactory.java    |  25 +---
 .../core/starter/config/ExecutionFactory.java      |  84 -------------
 seatunnel-core/seatunnel-flink-starter/pom.xml     |   6 -
 ...or.sh => start-seatunnel-flink-connector-v2.sh} |   0
 .../seatunnel/core/starter/flink/FlinkStarter.java |   3 +-
 .../core/starter/flink/SeatunnelFlink.java         |   3 +-
 .../starter/flink/config/FlinkApiEnvironment.java  |  75 ------------
 .../flink/config/FlinkEnvironmentFactory.java}     |  16 ++-
 .../starter/flink/execution/FlinkExecution.java    |   7 +-
 .../core/starter/flink/utils/CommandLineUtils.java |   5 +-
 seatunnel-core/seatunnel-spark-starter/pom.xml     |   6 -
 ...or.sh => start-seatunnel-spark-connector-v2.sh} |   0
 .../seatunnel/core/starter/spark/SparkStarter.java |   2 +-
 .../starter/spark/config/SparkEnvironment.java     |  66 ----------
 .../spark/config/SparkEnvironmentFactory.java}     |  15 ++-
 .../starter/spark/execution/SparkExecution.java    |   7 +-
 .../pom.xml                                        |   6 +-
 .../src/main/bin/seatunnel-cluster.sh}             |  19 +--
 .../src/main/bin/seatunnel.sh}                     |  17 +--
 .../core/starter/seatunnel/CommandLineUtils.java   |  74 ++++++++++++
 .../core/starter/seatunnel/SeaTunnelClient.java}   |  12 +-
 .../core/starter/seatunnel/SeaTunnelServer.java}   |  27 ++---
 .../starter/seatunnel/args/ClientCommandArgs.java} |  42 +++----
 .../seatunnel/args/ExecutionModeConverter.java     |   0
 .../starter/seatunnel/args/ServerCommandArgs.java} |  45 ++-----
 .../seatunnel/command/ApiConfValidateCommand.java} |  14 +--
 .../seatunnel/command/ClientCommandBuilder.java}   |  10 +-
 .../seatunnel/command/ClientExecuteCommand.java}   |  25 ++--
 .../seatunnel/command/ServerCommandBuilder.java}   |  11 +-
 .../seatunnel/command/ServerExecuteCommand.java    |  48 ++++++++
 .../config/SeaTunnelApiConfigChecker.java          |   0
 .../seatunnel/constant/SeaTunnelConstant.java      |  19 +--
 seatunnel-dist/release-docs/LICENSE                |   5 -
 .../src/main/assembly/assembly-bin-ci.xml          |  15 +--
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  15 +--
 .../connector-console-seatunnel-e2e}/pom.xml       |  22 ++--
 .../engine/e2e/console/FakeSourceToConsoleIT.java  |  20 ++--
 .../src/test/resources/fakesource_to_console.conf} |  23 +---
 .../connector-seatunnel-e2e-base}/pom.xml          |  45 ++++---
 .../seatunnel/engine/e2e}/JobExecutionIT.java      |  10 +-
 .../seatunnel/engine/e2e/SeaTunnelContainer.java   | 133 +++++++++++++++++++++
 .../org/apache/seatunnel/engine/e2e/TestUtils.java |   2 +-
 .../test/resources/batch_fakesource_to_file.conf   |   0
 .../batch_fakesource_to_file_complex.conf          |   0
 .../streaming_fakesource_to_file_complex.conf      |   0
 seatunnel-e2e/seatunnel-engine-e2e/pom.xml         |  11 ++
 .../apache/seatunnel/e2e/flink/FlinkContainer.java |   2 +-
 .../apache/seatunnel/e2e/spark/SparkContainer.java |   2 +-
 seatunnel-engine/seatunnel-engine-client/pom.xml   |   9 +-
 .../engine/client/job/ClientJobProxy.java          |   7 +-
 .../engine/client/job/JobConfigParser.java         |   2 +-
 .../src/test/resources/client_test.conf            |  26 ++--
 .../org/apache/seatunnel/engine/core/job/Job.java  |   2 +
 .../engine/server/task/SeaTunnelTask.java          |   6 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  12 +-
 .../seatunnel-engine-examples/pom.xml              |  12 +-
 .../example/engine/SeaTunnelEngineExample.java     |  19 +--
 .../engine/SeaTunnelEngineServerExample.java       |  15 +--
 tools/dependencies/known-dependencies.txt          |   6 -
 65 files changed, 538 insertions(+), 696 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9750e140e..5f5326181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
         <commons-io.version>2.11.0</commons-io.version>
         <commons-collections4.version>4.4</commons-collections4.version>
         <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
+        <commons-compress.version>1.20</commons-compress.version>
         <protostuff.version>1.8.0</protostuff.version>
         <spark.scope>provided</spark.scope>
         <flink.scope>provided</flink.scope>
@@ -629,6 +630,12 @@
                 <version>${jackson.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-compress</artifactId>
+                <version>${commons-compress.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>com.fasterxml.jackson.module</groupId>
                 <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
diff --git a/seatunnel-apis/seatunnel-api-base/pom.xml b/seatunnel-apis/seatunnel-api-base/pom.xml
index 9fe328718..502cae6c4 100644
--- a/seatunnel-apis/seatunnel-api-base/pom.xml
+++ b/seatunnel-apis/seatunnel-api-base/pom.xml
@@ -36,10 +36,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-config-shade</artifactId>
-        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
index c4b48e4cb..7ba7c840f 100644
--- a/seatunnel-core/README.md
+++ b/seatunnel-core/README.md
@@ -5,4 +5,7 @@ This module is the seatunnel job entrypoint. Seatunnel jobs are started by the b
 
 - seatunnel-core-flink: The flink job starter.
 - seatunnel-core-flink-sql: The flink sql job starter.
-- seatunnel-core-spark: The spark job starter.
\ No newline at end of file
+- seatunnel-core-spark: The spark job starter.
+- seatunnel-spark-starter: The spark job starter for connector-v2.
+- seatunnel-flink-starter: The flink job starter for connector-v2.
+- seatunnel-starter: The seatunnel engine job starter for connector-v2.
\ No newline at end of file
diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index 9735a18e4..b5a588a83 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -44,7 +44,7 @@
                 <module>seatunnel-core-starter</module>
                 <module>seatunnel-flink-starter</module>
                 <module>seatunnel-spark-starter</module>
-                <module>seatunnel-seatunnel-starter</module>
+                <module>seatunnel-starter</module>
             </modules>
         </profile>
         <profile>
@@ -52,7 +52,7 @@
             <modules>
                 <module>seatunnel-core-base</module>
                 <module>seatunnel-core-starter</module>
-                <module>seatunnel-seatunnel-starter</module>
+                <module>seatunnel-starter</module>
             </modules>
         </profile>
         <profile>
@@ -65,7 +65,7 @@
                 <module>seatunnel-core-starter</module>
                 <module>seatunnel-flink-starter</module>
                 <module>seatunnel-spark-starter</module>
-                <module>seatunnel-seatunnel-starter</module>
+                <module>seatunnel-starter</module>
             </modules>
         </profile>
     </profiles>
diff --git a/seatunnel-core/seatunnel-core-starter/pom.xml b/seatunnel-core/seatunnel-core-starter/pom.xml
index 561879c68..8d05a0545 100644
--- a/seatunnel-core/seatunnel-core-starter/pom.xml
+++ b/seatunnel-core/seatunnel-core-starter/pom.xml
@@ -40,19 +40,19 @@
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api-flink</artifactId>
+            <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api-spark</artifactId>
+            <artifactId>seatunnel-plugin-discovery</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-plugin-discovery</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
         </dependency>
 
         <dependency>
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
deleted file mode 100644
index a834965c0..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * The ExecutionContext contains all configuration needed to run the job.
- *
- * @param <ENVIRONMENT> environment type.
- */
-public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {
-
-    private final Config config;
-    private final EngineType engine;
-
-    private final ENVIRONMENT environment;
-    private final JobMode jobMode;
-
-    public AbstractExecutionContext(Config config, EngineType engine) {
-        this.config = config;
-        this.engine = engine;
-        this.environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
-        this.jobMode = environment.getJobMode();
-    }
-
-    public Config getRootConfig() {
-        return config;
-    }
-
-    public EngineType getEngine() {
-        return engine;
-    }
-
-    public ENVIRONMENT getEnvironment() {
-        return environment;
-    }
-
-    public JobMode getJobMode() {
-        return jobMode;
-    }
-
-    public abstract List<BaseSource<ENVIRONMENT>> getSources();
-
-    public abstract List<BaseTransform<ENVIRONMENT>> getTransforms();
-
-    public abstract List<BaseSink<ENVIRONMENT>> getSinks();
-
-    public abstract List<URL> getPluginJars();
-
-    @SuppressWarnings("checkstyle:Indentation")
-    protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
-        return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
-            List<? extends Config> configList = config.getConfigList(pluginType.getType());
-            return configList.stream()
-                .map(pluginConfig -> PluginIdentifier
-                    .of(engine.getEngine(),
-                        pluginType.getType(),
-                        pluginConfig.getString("plugin_name")));
-        }).collect(Collectors.toList());
-    }
-}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
index 20e77a1db..5b99b7aa0 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.core.starter.config;
 
 import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -31,39 +29,28 @@ import java.util.List;
  *
  * @param <ENVIRONMENT> environment type
  */
-public class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
+public abstract class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
 
     private static final String PLUGIN_NAME_KEY = "plugin_name";
 
     private final Config config;
-    private final EngineType engine;
 
-    public EnvironmentFactory(Config config, EngineType engine) {
+    public EnvironmentFactory(Config config) {
         this.config = config;
-        this.engine = engine;
     }
 
     // todo:put this method into submodule to avoid dependency on the engine
     public synchronized ENVIRONMENT getEnvironment() {
         Config envConfig = config.getConfig("env");
-        boolean enableHive = checkIsContainHive();
-        ENVIRONMENT env;
-        switch (engine) {
-            case SPARK:
-                env = (ENVIRONMENT) new SparkEnvironment().setEnableHive(enableHive);
-                break;
-            case FLINK:
-                env = (ENVIRONMENT) new FlinkEnvironment();
-                break;
-            default:
-                throw new IllegalArgumentException("Engine: " + engine + " is not supported");
-        }
+        ENVIRONMENT env = newEnvironment();
         env.setConfig(envConfig)
             .setJobMode(getJobMode(envConfig)).prepare();
         return env;
     }
 
-    private boolean checkIsContainHive() {
+    protected abstract ENVIRONMENT newEnvironment();
+
+    protected boolean checkIsContainHive() {
         List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
         for (Config c : sourceConfigList) {
             if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java
deleted file mode 100644
index d0d99cd94..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.apis.base.env.Execution;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.batch.FlinkBatchExecution;
-import org.apache.seatunnel.flink.stream.FlinkStreamExecution;
-import org.apache.seatunnel.spark.SparkEnvironment;
-import org.apache.seatunnel.spark.batch.SparkBatchExecution;
-import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
-import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to create {@link Execution}.
- *
- * @param <ENVIRONMENT> environment type
- */
-public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);
-
-    public AbstractExecutionContext<ENVIRONMENT> executionContext;
-
-    public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
-        this.executionContext = executionContext;
-    }
-
-    public Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT> createExecution() {
-        Execution execution = null;
-        switch (executionContext.getEngine()) {
-            case SPARK:
-                SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();
-                switch (executionContext.getJobMode()) {
-                    case STREAMING:
-                        execution = new SparkStreamingExecution(sparkEnvironment);
-                        break;
-                    case STRUCTURED_STREAMING:
-                        execution = new StructuredStreamingExecution(sparkEnvironment);
-                        break;
-                    default:
-                        execution = new SparkBatchExecution(sparkEnvironment);
-                }
-                break;
-            case FLINK:
-                FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();
-                switch (executionContext.getJobMode()) {
-                    case STREAMING:
-                        execution = new FlinkStreamExecution(flinkEnvironment);
-                        break;
-                    default:
-                        execution = new FlinkBatchExecution(flinkEnvironment);
-                }
-                break;
-            default:
-                throw new IllegalArgumentException("No suitable engine");
-        }
-        LOGGER.info("current execution is [{}]", execution.getClass().getName());
-        return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
-    }
-
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 6385b9c41..15131900b 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -42,12 +42,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-translation-flink</artifactId>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
rename to seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 52b207a79..4de645699 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.Starter;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
 
 import java.util.List;
@@ -44,7 +43,7 @@ public class FlinkStarter implements Starter {
     private final String appJar;
 
     FlinkStarter(String[] args) {
-        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
         // set the deployment mode, used to get the job jar path.
         Common.setDeployMode(flinkCommandArgs.getDeployMode());
         Common.setStarter(true);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
index 330c2e501..20824a901 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
@@ -22,13 +22,12 @@ import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
     public static void main(String[] args) throws CommandException {
-        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+        FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
         Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
             .buildCommand(flinkCommandArgs);
         Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java
deleted file mode 100644
index 5b8ba32d8..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.List;
-
-public class FlinkApiEnvironment implements RuntimeEnv {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiEnvironment.class);
-
-    private Config config;
-
-    @Override
-    public FlinkApiEnvironment setConfig(Config config) {
-        this.config = config;
-        return this;
-    }
-
-    @Override
-    public Config getConfig() {
-        return config;
-    }
-
-    @Override
-    public CheckResult checkConfig() {
-        // todo
-        return null;
-    }
-
-    @Override
-    public FlinkApiEnvironment prepare() {
-        // todo
-        return null;
-    }
-
-    @Override
-    public FlinkApiEnvironment setJobMode(JobMode mode) {
-        return null;
-    }
-
-    @Override
-    public JobMode getJobMode() {
-        return null;
-    }
-
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-
-    }
-}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
similarity index 67%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
index bfaff2a9c..a3d3c2eec 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
@@ -15,17 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.core.starter.flink.config;
 
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+public class FlinkEnvironmentFactory extends EnvironmentFactory<FlinkEnvironment> {
+
+    public FlinkEnvironmentFactory(Config config) {
+        super(config);
+    }
 
     @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // TODO implement
+    protected FlinkEnvironment newEnvironment() {
+        return new FlinkEnvironment();
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index ec7d2a199..9447910dc 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -18,10 +18,9 @@
 package org.apache.seatunnel.core.starter.flink.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.TaskExecution;
+import org.apache.seatunnel.core.starter.flink.config.FlinkEnvironmentFactory;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -41,15 +40,13 @@ public class FlinkExecution implements TaskExecution {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
 
-    private final Config config;
     private final FlinkEnvironment flinkEnvironment;
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
     public FlinkExecution(Config config) {
-        this.config = config;
-        this.flinkEnvironment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+        this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(flinkEnvironment.getJobMode());
         this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
index 9b29fdf16..9d9a6a43e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.core.starter.flink.utils;
 
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
 import org.apache.seatunnel.core.starter.flink.constant.FlinkConstant;
 
 import com.beust.jcommander.JCommander;
@@ -43,10 +42,10 @@ public class CommandLineUtils {
         return flinkCommandArgs;
     }
 
-    public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
+    public static FlinkCommandArgs parseCommandArgs(String[] args) {
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         JCommander jCommander = JCommander.newBuilder()
-            .programName(jobType.getType())
+            .programName("start-seatunnel-flink-connector-v2.sh")
             .addObject(flinkCommandArgs)
             .acceptUnknownOptions(true)
             .args(args)
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
index d65b58739..e02470ea2 100644
--- a/seatunnel-core/seatunnel-spark-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -35,12 +35,6 @@
 
     <dependencies>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-starter</artifactId>
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-new-connector.sh b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-connector-v2.sh
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-new-connector.sh
rename to seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-connector-v2.sh
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 00261a80c..e9cd2b41f 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -134,7 +134,7 @@ public class SparkStarter implements Starter {
     private static SparkCommandArgs parseCommandArgs(String[] args) {
         SparkCommandArgs commandArgs = new SparkCommandArgs();
         JCommander commander = JCommander.newBuilder()
-                .programName("start-seatunnel-spark.sh")
+                .programName("start-seatunnel-spark-connector-v2.sh")
                 .addObject(commandArgs)
                 .args(args)
                 .build();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
deleted file mode 100644
index f7d5b83d4..000000000
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
+++ /dev/null
@@ -1,66 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.List;
-
-public class SparkEnvironment implements RuntimeEnv {
-
-    @Override
-    public SparkEnvironment setConfig(Config config) {
-        return null;
-    }
-
-    @Override
-    public Config getConfig() {
-        return null;
-    }
-
-    @Override
-    public CheckResult checkConfig() {
-        return null;
-    }
-
-    @Override
-    public SparkEnvironment prepare() {
-        return null;
-    }
-
-    @Override
-    public SparkEnvironment setJobMode(JobMode mode) {
-        return null;
-    }
-
-    @Override
-    public JobMode getJobMode() {
-        return null;
-    }
-
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-
-    }
-}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
similarity index 65%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
index bfaff2a9c..8ac703ea8 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
@@ -15,17 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.core.starter.spark.config;
 
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+public class SparkEnvironmentFactory extends EnvironmentFactory<SparkEnvironment> {
+    public SparkEnvironmentFactory(Config config) {
+        super(config);
+    }
 
     @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // TODO implement
+    protected SparkEnvironment newEnvironment() {
+        return new SparkEnvironment().setEnableHive(checkIsContainHive());
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 0bb57268c..0ec9f6662 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -18,9 +18,8 @@
 package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.spark.config.SparkEnvironmentFactory;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -37,15 +36,13 @@ public class SparkExecution {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SparkExecution.class);
 
-    private final Config config;
     private final SparkEnvironment sparkEnvironment;
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
     public SparkExecution(Config config) {
-        this.config = config;
-        this.sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+        this.sparkEnvironment = new SparkEnvironmentFactory(config).getEnvironment();
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(sparkEnvironment.getJobMode());
         this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, jobContext, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml b/seatunnel-core/seatunnel-starter/pom.xml
similarity index 94%
rename from seatunnel-core/seatunnel-seatunnel-starter/pom.xml
rename to seatunnel-core/seatunnel-starter/pom.xml
index b4d7c3bf6..81dccddaf 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
+++ b/seatunnel-core/seatunnel-starter/pom.xml
@@ -26,7 +26,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-seatunnel-starter</artifactId>
+    <artifactId>seatunnel-starter</artifactId>
 
     <dependencies>
         <dependency>
@@ -54,4 +54,8 @@
         </dependency>
     </dependencies>
 
+    <build>
+        <finalName>${project.name}</finalName>
+    </build>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
similarity index 79%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
rename to seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
index 1cfc741d8..228d07d01 100755
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
+++ b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
@@ -36,8 +36,8 @@ done
 PRG_DIR=`dirname "$PRG"`
 APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
 CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
-APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"
 
 if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
     . "${CONF_DIR}/seatunnel-env.sh"
@@ -45,20 +45,9 @@ fi
 
 if [ $# == 0 ]
 then
-    args="-h"
+    args=""
 else
     args=$@
 fi
 
-CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
-if [ ${EXIT_CODE} -eq 234 ]; then
-    # print usage
-    echo "${CMD}"
-    exit 0
-elif [ ${EXIT_CODE} -eq 0 ]; then
-    echo "Execute SeaTunnel Job: ${CMD}"
-    eval ${CMD}
-else
-    echo "${CMD}"
-    exit ${EXIT_CODE}
-fi
+java -cp ${APP_JAR} ${APP_MAIN} ${args}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
similarity index 76%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
rename to seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
index 1cfc741d8..601bfd4e7 100755
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
+++ b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
@@ -36,8 +36,8 @@ done
 PRG_DIR=`dirname "$PRG"`
 APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
 CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
-APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient"
 
 if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
     . "${CONF_DIR}/seatunnel-env.sh"
@@ -50,15 +50,4 @@ else
     args=$@
 fi
 
-CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
-if [ ${EXIT_CODE} -eq 234 ]; then
-    # print usage
-    echo "${CMD}"
-    exit 0
-elif [ ${EXIT_CODE} -eq 0 ]; then
-    echo "Execute SeaTunnel Job: ${CMD}"
-    eval ${CMD}
-else
-    echo "${CMD}"
-    exit ${EXIT_CODE}
-fi
+java -cp ${APP_JAR} ${APP_MAIN} ${args}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
new file mode 100644
index 000000000..5c0e1b650
--- /dev/null
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel;
+
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.constant.SeaTunnelConstant;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+    private static final String CLIENT_SHELL_NAME = "seatunnel.sh";
+
+    private static final String SERVER_SHELL_NAME = "seatunnel.sh";
+
+    private CommandLineUtils() {
+        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+    }
+
+    public static ClientCommandArgs parseSeaTunnelClientArgs(String[] args) {
+        ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
+        JCommander jCommander = getJCommander(CLIENT_SHELL_NAME, args, clientCommandArgs);
+        // The args is not belongs to seatunnel, add into flink params
+        clientCommandArgs.setSeatunnelParams(jCommander.getUnknownOptions());
+        if (clientCommandArgs.isHelp()) {
+            printHelp(jCommander);
+        }
+        return clientCommandArgs;
+    }
+
+    public static ServerCommandArgs parseSeaTunnelServerArgs(String[] args) {
+        ServerCommandArgs serverCommandArgs = new ServerCommandArgs();
+        JCommander jCommander = getJCommander(SERVER_SHELL_NAME, args, serverCommandArgs);
+        // The args is not belongs to seatunnel, add into flink params
+        serverCommandArgs.setSeatunnelParams(jCommander.getUnknownOptions());
+        if (serverCommandArgs.isHelp()) {
+            printHelp(jCommander);
+        }
+        return serverCommandArgs;
+    }
+
+    private static void printHelp(JCommander jCommander) {
+        jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+        jCommander.usage();
+        System.exit(SeaTunnelConstant.USAGE_EXIT_CODE);
+    }
+
+    private static JCommander getJCommander(String shellName, String[] args, Object serverCommandArgs) {
+        return JCommander.newBuilder()
+            .programName(shellName)
+            .addObject(serverCommandArgs)
+            .acceptUnknownOptions(true)
+            .args(args)
+            .build();
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
similarity index 73%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
index a3a98a7b1..0f092781b 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.starter.seatunnel;
 import org.apache.seatunnel.core.starter.Seatunnel;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ClientCommandBuilder;
 
-public class SeaTunnel {
+public class SeaTunnelClient {
     public static void main(String[] args) throws CommandException {
-        SeaTunnelCommandArgs seaTunnelCommandArgs = CommandLineUtils.parseSeaTunnelArgs(args);
-        Command<SeaTunnelCommandArgs> command =
-            new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+        ClientCommandArgs clientCommandArgs = CommandLineUtils.parseSeaTunnelClientArgs(args);
+        Command<ClientCommandArgs> command =
+            new ClientCommandBuilder().buildCommand(clientCommandArgs);
         Seatunnel.run(command);
     }
 }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
similarity index 56%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
index fd204e87a..a15adeb9f 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
@@ -17,22 +17,17 @@
 
 package org.apache.seatunnel.core.starter.seatunnel;
 
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ServerCommandBuilder;
 
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
-    private CommandLineUtils() {
-        throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
-    }
-
-    public static SeaTunnelCommandArgs parseSeaTunnelArgs(String[] args) {
-        SeaTunnelCommandArgs seatunnelCommandArgs = new SeaTunnelCommandArgs();
-        JCommander.newBuilder()
-            .addObject(seatunnelCommandArgs)
-            .build()
-            .parse(args);
-        return seatunnelCommandArgs;
+public class SeaTunnelServer {
+    public static void main(String[] args) throws CommandException {
+        ServerCommandArgs serverCommandArgs = CommandLineUtils.parseSeaTunnelServerArgs(args);
+        Command<ServerCommandArgs> command =
+            new ServerCommandBuilder().buildCommand(serverCommandArgs);
+        Seatunnel.run(command);
     }
 }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
similarity index 95%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9e86b2800..d5202b23c 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -26,7 +26,7 @@ import com.beust.jcommander.Parameter;
 
 import java.util.List;
 
-public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+public class ClientCommandArgs extends AbstractCommandArgs {
 
     /**
      * Undefined parameters parsed will be stored here as seatunnel engine command parameters.
@@ -37,14 +37,30 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
         description = "The name of job")
     private String name = "seatunnel_job";
 
+    @Parameter(names = {"-e", "--deploy-mode"},
+        description = "SeaTunnel deploy mode",
+        converter = ExecutionModeConverter.class)
+    private ExecutionMode executionMode = ExecutionMode.CLUSTER;
+
     @Parameter(names = {"-cn", "--cluster"},
         description = "The name of cluster")
     private String clusterName = "seatunnel_default_cluster";
 
-    @Parameter(names = {"-e", "--deploy-mode"},
-        description = "SeaTunnel deploy mode",
-        converter = ExecutionModeConverter.class)
-    private ExecutionMode executionMode = ExecutionMode.LOCAL;
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public List<String> getSeatunnelParams() {
+        return seatunnelParams;
+    }
+
+    public void setSeatunnelParams(List<String> seatunnelParams) {
+        this.seatunnelParams = seatunnelParams;
+    }
 
     public ExecutionMode getExecutionMode() {
         return executionMode;
@@ -62,14 +78,6 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
         this.name = name;
     }
 
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
     @Override
     public EngineType getEngineType() {
         return EngineType.SEATUNNEL;
@@ -79,12 +87,4 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
     public DeployMode getDeployMode() {
         return DeployMode.CLIENT;
     }
-
-    public List<String> getSeatunnelParams() {
-        return seatunnelParams;
-    }
-
-    public void setSeatunnelParams(List<String> seatunnelParams) {
-        this.seatunnelParams = seatunnelParams;
-    }
 }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
similarity index 100%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
similarity index 58%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index 9e86b2800..ec0019eb9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -17,49 +17,34 @@
 
 package org.apache.seatunnel.core.starter.seatunnel.args;
 
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.core.starter.command.CommandArgs;
 
 import com.beust.jcommander.Parameter;
 
 import java.util.List;
 
-public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+public class ServerCommandArgs implements CommandArgs {
 
     /**
      * Undefined parameters parsed will be stored here as seatunnel engine command parameters.
      */
     private List<String> seatunnelParams;
 
-    @Parameter(names = {"-n", "--name"},
-        description = "The name of job")
-    private String name = "seatunnel_job";
-
     @Parameter(names = {"-cn", "--cluster"},
         description = "The name of cluster")
     private String clusterName = "seatunnel_default_cluster";
 
-    @Parameter(names = {"-e", "--deploy-mode"},
-        description = "SeaTunnel deploy mode",
-        converter = ExecutionModeConverter.class)
-    private ExecutionMode executionMode = ExecutionMode.LOCAL;
-
-    public ExecutionMode getExecutionMode() {
-        return executionMode;
-    }
-
-    public void setExecutionMode(ExecutionMode executionMode) {
-        this.executionMode = executionMode;
-    }
+    @Parameter(names = {"-h", "--help"},
+        help = true,
+        description = "Show the usage message")
+    private boolean help = false;
 
-    public String getName() {
-        return name;
+    public boolean isHelp() {
+        return help;
     }
 
-    public void setName(String name) {
-        this.name = name;
+    public void setHelp(boolean help) {
+        this.help = help;
     }
 
     public String getClusterName() {
@@ -70,16 +55,6 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
         this.clusterName = clusterName;
     }
 
-    @Override
-    public EngineType getEngineType() {
-        return EngineType.SEATUNNEL;
-    }
-
-    @Override
-    public DeployMode getDeployMode() {
-        return DeployMode.CLIENT;
-    }
-
     public List<String> getSeatunnelParams() {
         return seatunnelParams;
     }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
similarity index 77%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
index 8036120ae..20bc93659 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
@@ -32,19 +32,19 @@ import java.nio.file.Path;
 /**
  * Use to validate the configuration of the SeaTunnel API.
  */
-public class SeaTunnelApiConfValidateCommand implements Command<SeaTunnelCommandArgs> {
+public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ApiConfValidateCommand.class);
 
-    private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+    private final ClientCommandArgs clientCommandArgs;
 
-    public SeaTunnelApiConfValidateCommand(SeaTunnelCommandArgs seaTunnelCommandArgs) {
-        this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+    public ApiConfValidateCommand(ClientCommandArgs clientCommandArgs) {
+        this.clientCommandArgs = clientCommandArgs;
     }
 
     @Override
     public void execute() throws ConfigCheckException {
-        Path configPath = FileUtils.getConfigPath(seaTunnelCommandArgs);
+        Path configPath = FileUtils.getConfigPath(clientCommandArgs);
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
         new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
         LOGGER.info("config OK !");
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
similarity index 73%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
index 722c84e7b..fb9bfc6d0 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 
-public class SeaTunnelCommandBuilder implements CommandBuilder<SeaTunnelCommandArgs> {
+public class ClientCommandBuilder implements CommandBuilder<ClientCommandArgs> {
 
     @Override
-    public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs commandArgs) {
+    public Command<ClientCommandArgs> buildCommand(ClientCommandArgs commandArgs) {
         Common.setDeployMode(commandArgs.getDeployMode());
-        return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
-            : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+        return commandArgs.isCheckConfig() ? new ApiConfValidateCommand(commandArgs)
+            : new ClientExecuteCommand(commandArgs);
     }
 }
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
similarity index 82%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index d9d8638d9..d99e708e9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
 
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -41,26 +41,25 @@ import java.util.concurrent.ExecutionException;
 /**
  * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
  */
-public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandArgs> {
+public class ClientExecuteCommand implements Command<ClientCommandArgs> {
 
-    private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+    private final ClientCommandArgs clientCommandArgs;
 
-    // TODO custom cluster name on cluster execution mode
-
-    public SeaTunnelApiTaskExecuteCommand(SeaTunnelCommandArgs seaTunnelCommandArgs) {
-        this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+    public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
+        this.clientCommandArgs = clientCommandArgs;
     }
 
     @Override
     public void execute() throws CommandExecuteException {
-        Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+        Path configFile = FileUtils.getConfigPath(clientCommandArgs);
 
         JobConfig jobConfig = new JobConfig();
-        jobConfig.setName(seaTunnelCommandArgs.getName());
+        jobConfig.setName(clientCommandArgs.getName());
         HazelcastInstance instance = null;
+        ClientJobProxy clientJobProxy = null;
         try {
-            String clusterName = seaTunnelCommandArgs.getClusterName();
-            if (seaTunnelCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
+            String clusterName = clientCommandArgs.getClusterName();
+            if (clientCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
                 clusterName = creatRandomClusterName(clusterName);
                 instance = createServerInLocal(clusterName);
             }
@@ -69,7 +68,6 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandA
             SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
             JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
 
-            ClientJobProxy clientJobProxy;
             clientJobProxy = jobExecutionEnv.execute();
             clientJobProxy.waitForJobComplete();
         } catch (ExecutionException | InterruptedException e) {
@@ -78,6 +76,9 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandA
             if (instance != null) {
                 instance.shutdown();
             }
+            if (clientJobProxy != null) {
+                clientJobProxy.close();
+            }
         }
     }
 
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
similarity index 65%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
index 722c84e7b..999bd7893 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
@@ -17,17 +17,14 @@
 
 package org.apache.seatunnel.core.starter.seatunnel.command;
 
-import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
 
-public class SeaTunnelCommandBuilder implements CommandBuilder<SeaTunnelCommandArgs> {
+public class ServerCommandBuilder implements CommandBuilder<ServerCommandArgs> {
 
     @Override
-    public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs commandArgs) {
-        Common.setDeployMode(commandArgs.getDeployMode());
-        return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
-            : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+    public Command<ServerCommandArgs> buildCommand(ServerCommandArgs commandArgs) {
+        return new ServerExecuteCommand(commandArgs);
     }
 }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
new file mode 100644
index 000000000..bbd50fd56
--- /dev/null
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+
+/**
+ * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
+ */
+public class ServerExecuteCommand implements Command<ServerCommandArgs> {
+
+    private final ServerCommandArgs serverCommandArgs;
+
+    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
+        this.serverCommandArgs = serverCommandArgs;
+    }
+
+    @Override
+    public void execute() {
+        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
+        HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+            Thread.currentThread().getName(),
+            new SeaTunnelNodeContext(seaTunnelConfig));
+    }
+
+}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
similarity index 100%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
similarity index 68%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
index 517d5b1f2..7b032b9f4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
@@ -15,21 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.core.starter.seatunnel.constant;
 
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
-    long getJobId();
-
-    PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
-
-    void cancelJob();
-
-    JobStatus getJobStatus();
-
-    JobStatus waitForJobComplete();
+public class SeaTunnelConstant {
+    public static final int USAGE_EXIT_CODE = 234;
 }
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index d693eea8c..fb6b6359f 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -345,9 +345,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.4 - http://commons.apache.org/proper/commons-cli/)
      (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
      (Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
-     (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.18 - https://commons.apache.org/proper/commons-compress/)
      (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
-     (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.21 - https://commons.apache.org/proper/commons-compress/)
      (Apache License, Version 2.0) Apache Commons Configuration (org.apache.commons:commons-configuration2:2.1.1 - http://commons.apache.org/proper/commons-configuration/)
      (Apache License, Version 2.0) Apache Commons Crypto (org.apache.commons:commons-crypto:1.0.0 - http://commons.apache.org/proper/commons-crypto/)
      (Apache License, Version 2.0) Apache Commons Email (org.apache.commons:commons-email:1.5 - http://commons.apache.org/proper/commons-email/)
@@ -627,7 +625,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Apache Avro IPC (org.apache.avro:avro-ipc:1.8.2 - http://avro.apache.org)
      (The Apache Software License, Version 2.0) Apache Avro Mapred API (org.apache.avro:avro-mapred:1.8.2 - http://avro.apache.org/avro-mapred)
      (The Apache Software License, Version 2.0) Apache Commons CSV (org.apache.commons:commons-csv:1.0 - http://commons.apache.org/proper/commons-csv/)
-     (The Apache Software License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.8.1 - http://commons.apache.org/proper/commons-compress/)
      (The Apache Software License, Version 2.0) Apache Commons DBCP (org.apache.commons:commons-dbcp2:2.0.1 - http://commons.apache.org/proper/commons-dbcp/)
      (The Apache Software License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.3.2 - http://commons.apache.org/proper/commons-lang/)
      (The Apache Software License, Version 2.0) Apache Commons Logging (commons-logging:commons-logging:1.2 - http://commons.apache.org/proper/commons-logging/)
@@ -678,7 +675,6 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) ClassMate (com.fasterxml:classmate:1.1.0 - http://github.com/cowtowncoder/java-classmate)
      (The Apache Software License, Version 2.0) Commons BeanUtils Core (commons-beanutils:commons-beanutils-core:1.8.0 - http://commons.apache.org/beanutils/)
      (The Apache Software License, Version 2.0) Commons CLI (commons-cli:commons-cli:1.2 - http://commons.apache.org/cli/)
-     (The Apache Software License, Version 2.0) Commons Compress (org.apache.commons:commons-compress:1.4.1 - http://commons.apache.org/compress/)
      (The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.6 - http://commons.apache.org/${pom.artifactId.substring(8)}/)
      (The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.7 - http://commons.apache.org/configuration/)
      (The Apache Software License, Version 2.0) Commons DBCP (commons-dbcp:commons-dbcp:1.4 - http://commons.apache.org/dbcp/)
@@ -1211,7 +1207,6 @@ The following components are provided under the Public Domain License. See proje
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
      (Public Domain) AOP alliance (aopalliance:aopalliance:1.0 - http://aopalliance.sourceforge.net)
-     (Public Domain) XZ for Java (org.tukaani:xz:1.0 - http://tukaani.org/xz/java.html)
      (Public Domain) XZ for Java (org.tukaani:xz:1.5 - http://tukaani.org/xz/java.html)
      (Public Domain) XZ for Java (org.tukaani:xz:1.8 - https://tukaani.org/xz/java.html)
      (Public Domain, per Creative Commons CC0) HdrHistogram (org.hdrhistogram:HdrHistogram:2.1.9 - http://hdrhistogram.github.io/HdrHistogram/)
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index d4f7aaffa..87eb3a111 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -120,19 +120,14 @@
         </fileSet>
         <!-- seatunnel engine -->
         <fileSet>
-            <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
-            <includes>
-                <include>seatunnel-engine-*.jar</include>
-            </includes>
-            <excludes>
-                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
-            </excludes>
-            <outputDirectory>/lib</outputDirectory>
+            <directory>../seatunnel-core/seatunnel-starter/src/main/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+            <fileMode>0755</fileMode>
         </fileSet>
         <fileSet>
-            <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+            <directory>../seatunnel-core/seatunnel-starter/target</directory>
             <includes>
-                <include>seatunnel-engine-*.jar</include>
+                <include>seatunnel-starter*.jar</include>
             </includes>
             <excludes>
                 <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index eddaf8129..6da678c6e 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -120,19 +120,14 @@
         </fileSet>
         <!-- seatunnel engine -->
         <fileSet>
-            <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
-            <includes>
-                <include>seatunnel-engine-*.jar</include>
-            </includes>
-            <excludes>
-                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
-            </excludes>
-            <outputDirectory>/lib</outputDirectory>
+            <directory>../seatunnel-core/seatunnel-starter/src/main/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+            <fileMode>0755</fileMode>
         </fileSet>
         <fileSet>
-            <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+            <directory>../seatunnel-core/seatunnel-starter/target</directory>
             <includes>
-                <include>seatunnel-engine-*.jar</include>
+                <include>seatunnel-starter*.jar</include>
             </includes>
             <excludes>
                 <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
similarity index 71%
copy from seatunnel-examples/seatunnel-engine-examples/pom.xml
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
index b75a68b85..afad8f533 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
@@ -20,29 +20,23 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-examples</artifactId>
+        <artifactId>seatunnel-engine-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-engine-examples</artifactId>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-seatunnel-starter</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+    <artifactId>connector-console-seatunnel-e2e</artifactId>
 
+    <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-fake</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
+            <artifactId>connector-seatunnel-e2e-base</artifactId>
             <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
         </dependency>
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
similarity index 59%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
index bfaff2a9c..fa63feb1c 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
@@ -15,17 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.engine.e2e.console;
 
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
 
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+import java.io.IOException;
 
-    @Override
-    public void checkConfig(Config config) throws ConfigCheckException {
-        // TODO implement
+public class FakeSourceToConsoleIT extends SeaTunnelContainer {
+
+    @Test
+    public void testFakeSourceToConsoleSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelJob("/fakesource_to_console.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
     }
 }
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
similarity index 63%
copy from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
index 1e49074ba..35f8633ef 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
@@ -19,41 +19,22 @@
 ######
 
 env {
-  # You can set flink configuration here
-  execution.parallelism = 1
   job.mode = "BATCH"
-  execution.checkpoint.interval = 5000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }
 
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
     FakeSource {
       result_table_name = "fake"
-      field_name = "name,age",
-      parallelism = 3
+      field_name = "name,age"
     }
-
 }
 
 transform {
 }
 
 sink {
-  LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
-
+  console {
   }
 
 }
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
similarity index 59%
copy from seatunnel-examples/seatunnel-engine-examples/pom.xml
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index b75a68b85..03f7c66bf 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -20,29 +20,36 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-examples</artifactId>
+        <artifactId>seatunnel-engine-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-engine-examples</artifactId>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-seatunnel-starter</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+    <artifactId>connector-seatunnel-e2e-base</artifactId>
+
+    <properties>
+        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>${maven-jar-plugin.version}</version>
+                <configuration>
+                    <skip>false</skip>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-fake</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
similarity index 95%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 8fb21b51b..b1c444ace 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.e2e.engine;
-
-import static org.awaitility.Awaitility.await;
+package org.apache.seatunnel.engine.e2e;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
@@ -29,12 +27,12 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.e2e.TestUtils;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -88,7 +86,7 @@ public class JobExecutionIT {
                 return clientJobProxy.waitForJobComplete();
             });
 
-            await().atMost(20000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> Assert.assertTrue(
                     objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
 
@@ -121,7 +119,7 @@ public class JobExecutionIT {
             Thread.sleep(1000);
             clientJobProxy.cancelJob();
 
-            await().atMost(20000, TimeUnit.MILLISECONDS)
+            Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
                 .untilAsserted(() -> Assert.assertTrue(
                     objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
 
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
new file mode 100644
index 000000000..9b6495850
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class SeaTunnelContainer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelContainer.class);
+    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+    protected static final Network NETWORK = Network.newNetwork();
+    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
+
+    private static final String SEATUNNEL_HOME = "/tmp/seatunnel";
+    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
+    private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
+    private static final String CLIENT_SHELL = "seatunnel.sh";
+    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+    private GenericContainer<?> server;
+
+    @Before
+    public void before() {
+        Map<String, String> mountMapping = getFileMountMapping();
+        server = new GenericContainer<>(JDK_DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withCommand(Paths.get(SEATUNNEL_BIN, SERVER_SHELL).toString())
+            .withNetworkAliases("server")
+            .withExposedPorts()
+            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .waitingFor(Wait.forLogMessage(".*received new worker register.*\\n", 1));
+        mountMapping.forEach(server::withFileSystemBind);
+        server.start();
+    }
+
+    protected Map<String, String> getFileMountMapping() {
+
+        Map<String, String> mountMapping = new HashMap<>();
+        // copy lib
+        mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/target/seatunnel-starter.jar",
+            Paths.get(SEATUNNEL_LIB, "seatunnel-starter.jar").toString());
+
+        // copy bin
+        mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/src/main/bin/",
+            Paths.get(SEATUNNEL_BIN).toString());
+
+        // copy connectors
+        mountMapping.put(PROJECT_ROOT_PATH +
+            "/seatunnel-connectors-v2-dist/target/lib", Paths.get(SEATUNNEL_CONNECTORS, "seatunnel").toString());
+
+        // copy plugin-mapping.properties
+        mountMapping.put(PROJECT_ROOT_PATH + "/plugin-mapping.properties", Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+
+        return mountMapping;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public Container.ExecResult executeSeaTunnelJob(String confFile) throws IOException, InterruptedException {
+        final String confPath = getResource(confFile);
+        if (!new File(confPath).exists()) {
+            throw new IllegalArgumentException(confFile + " doesn't exist");
+        }
+        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+        server.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
+
+        // Running IT use cases under Windows requires replacing \ with /
+        String conf = targetConfInContainer.replaceAll("\\\\", "/");
+        final List<String> command = new ArrayList<>();
+        command.add(Paths.get(SEATUNNEL_BIN, CLIENT_SHELL).toString());
+        command.add("--config " + conf);
+
+        CompletableFuture.runAsync(() -> {
+            try {
+                Thread.sleep(15000);
+                // cancel server if bash command not return
+                server.stop();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        Container.ExecResult execResult = server.execInContainer("bash", "-c", String.join(" ", command));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        return execResult;
+    }
+
+    @After
+    public void after() {
+        if (server != null) {
+            server.close();
+        }
+    }
+
+    private String getResource(String confFile) {
+        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
similarity index 96%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 786a34ce6..f44177e09 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -37,7 +37,7 @@ public class TestUtils {
     public static void initPluginDir() {
         // copy connectors to project_root/connectors dir
         System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
-            String.format("%s..%s..%s", File.separator, File.separator, File.separator));
+            String.format("%s..%s..%s..%s", File.separator, File.separator, File.separator, File.separator));
         File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));
 
         File connectorDir = new File(seatunnelRootDir +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index ea8a31f3a..570bbefa7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -27,6 +27,12 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>seatunnel-engine-e2e</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>connector-seatunnel-e2e-base</module>
+        <module>connector-console-seatunnel-e2e</module>
+    </modules>
 
     <dependencies>
         <dependency>
@@ -66,9 +72,14 @@
             <artifactId>seatunnel-engine-server</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 0ffa31634..6666bd5ef 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -53,7 +53,7 @@ public abstract class FlinkContainer {
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh";
+    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-connector-v2.sh";
     private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index de30eeb41..282a74c6a 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,7 +52,7 @@ public abstract class SparkContainer {
 
     protected GenericContainer<?> master;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
-    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh";
+    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-connector-v2.sh";
     private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index b24ad2088..f8e51c2fc 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -41,7 +41,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-base</artifactId>
+            <artifactId>seatunnel-core-starter</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
@@ -66,6 +66,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-file-local</artifactId>
@@ -73,6 +79,7 @@
             <scope>test</scope>
         </dependency>
 
+
         <dependency>
             <groupId>com.hazelcast</groupId>
             <artifactId>hazelcast</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 8c372f2d5..044143108 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -53,6 +53,7 @@ public class ClientJobProxy implements Job {
     }
 
     private void submitJob() {
+        LOGGER.info(String.format("start submit job, job id: %s, with plugin jar %s", jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
         ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
             seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
         PassiveCompletableFuture<Void> submitJobFuture =
@@ -84,10 +85,14 @@ public class ClientJobProxy implements Job {
             jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId(),
             jobStatus));
-        this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
         return jobStatus;
     }
 
+    @Override
+    public void close() {
+        this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
+    }
+
     @Override
     public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
         return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 608838e9b..0d19411d6 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.apis.base.plugin.Plugin;
 import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index 69ce5614a..e2634dfa8 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -46,18 +46,18 @@ transform {
 
 sink {
   LocalFile {
-    path="file:///tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error",
-    source_table_name="fake"
+    path = "file:///tmp/hive/warehouse/test2"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name", "age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    save_mode = "error",
+    source_table_name = "fake"
   }
 }
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 517d5b1f2..69ed8ad81 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -32,4 +32,6 @@ public interface Job {
     JobStatus getJobStatus();
 
     JobStatus waitForJobComplete();
+
+    void close();
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index f7b215820..5379e1d60 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -231,7 +231,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     @Override
     public Set<URL> getJarsUrl() {
-        List<Flow> now = Collections.singletonList(executionFlow);
+        List<Flow> now = new ArrayList<>();
+        now.add(executionFlow);
         Set<URL> urls = new HashSet<>();
         while (!now.isEmpty()) {
             final List<Flow> next = new ArrayList<>();
@@ -241,7 +242,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 }
                 next.addAll(n.getNext());
             });
-            now = next;
+            now.clear();
+            now.addAll(next);
         }
         return urls;
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index f5eeb8e80..684ed7904 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -124,7 +124,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
                     prepareClose = true;
                 }
                 if (barrier.snapshot()) {
-                    if (writerStateSerializer.isPresent()) {
+                    if (!writerStateSerializer.isPresent()) {
                         runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
                     } else {
                         List<StateT> states = writer.snapshotState(barrier.getId());
@@ -136,10 +136,14 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
                         writer.abortPrepare();
                         throw e;
                     }
-                    lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
-                        SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
+                    if (containAggCommitter) {
+                        lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
+                            SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
+                    }
                 } else {
-                    runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+                    if (containAggCommitter) {
+                        runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+                    }
                 }
                 runningTask.ack(barrier);
             } else {
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index b75a68b85..a0d952b2c 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -30,19 +30,9 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-seatunnel-starter</artifactId>
+            <artifactId>seatunnel-starter</artifactId>
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-fake</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
-            <version>${project.version}</version>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
index a50f31271..02736987a 100644
--- a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
+++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
@@ -20,8 +20,9 @@ package org.apache.seatunnel.example.engine;
 import org.apache.seatunnel.core.starter.Seatunnel;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ClientCommandBuilder;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
 
 import java.io.FileNotFoundException;
 import java.net.URISyntaxException;
@@ -32,12 +33,14 @@ public class SeaTunnelEngineExample {
 
     public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
         String configFile = getTestConfigFile("/examples/fake_to_console.conf");
-        SeaTunnelCommandArgs seaTunnelCommandArgs = new SeaTunnelCommandArgs();
-        seaTunnelCommandArgs.setConfigFile(configFile);
-        seaTunnelCommandArgs.setCheckConfig(false);
-        seaTunnelCommandArgs.setName("fake_to_console");
-        Command<SeaTunnelCommandArgs> command =
-            new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+        ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
+        clientCommandArgs.setConfigFile(configFile);
+        clientCommandArgs.setCheckConfig(false);
+        clientCommandArgs.setName("fake_to_console");
+        // Change Execution Mode to CLUSTER to use client mode, before do this, you should start SeaTunnelEngineServerExample
+        clientCommandArgs.setExecutionMode(ExecutionMode.LOCAL);
+        Command<ClientCommandArgs> command =
+            new ClientCommandBuilder().buildCommand(clientCommandArgs);
         Seatunnel.run(command);
     }
 
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
similarity index 70%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
rename to seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
index a3a98a7b1..af28f2872 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
+++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.seatunnel;
+package org.apache.seatunnel.example.engine;
 
 import org.apache.seatunnel.core.starter.Seatunnel;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ServerCommandBuilder;
 
-public class SeaTunnel {
+public class SeaTunnelEngineServerExample {
     public static void main(String[] args) throws CommandException {
-        SeaTunnelCommandArgs seaTunnelCommandArgs = CommandLineUtils.parseSeaTunnelArgs(args);
-        Command<SeaTunnelCommandArgs> command =
-            new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+        ServerCommandArgs serverCommandArgs = new ServerCommandArgs();
+        Command<ServerCommandArgs> command =
+            new ServerCommandBuilder().buildCommand(serverCommandArgs);
         Seatunnel.run(command);
     }
+
 }
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 077ddc966..0ddee1d08 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -68,12 +68,7 @@ commons-collections-3.2.2.jar
 commons-collections4-4.4.jar
 commons-compiler-3.0.9.jar
 commons-compiler-3.1.6.jar
-commons-compress-1.18.jar
-commons-compress-1.19.jar
 commons-compress-1.20.jar
-commons-compress-1.21.jar
-commons-compress-1.4.1.jar
-commons-compress-1.8.1.jar
 commons-configuration-1.7.jar
 commons-configuration2-2.1.1.jar
 commons-crypto-1.0.0.jar
@@ -722,7 +717,6 @@ xercesImpl-2.9.1.jar
 xml-apis-1.3.04.jar
 xmlbeans-3.1.0.jar
 xmlenc-0.52.jar
-xz-1.0.jar
 xz-1.5.jar
 xz-1.8.jar
 zkclient-0.3.jar