You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/09 12:26:35 UTC

[incubator-seatunnel] branch dev updated: [Improve][Starter] Optimize code structure & remove redundant code (#4525)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c24e39927 [Improve][Starter] Optimize code structure & remove redundant code (#4525)
c24e39927 is described below

commit c24e39927f9946015398855fd9e1907b3bd014da
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Sun Apr 9 20:26:27 2023 +0800

    [Improve][Starter] Optimize code structure & remove redundant code (#4525)
    
    * [Improve][Starter] Optimize code structure & remove redundant code
    
    * [Improve][Starter] Revert SeaTunnelApiExample
    
    * [Improve][Starter] Fix ci error
---
 seatunnel-core/seatunnel-flink-starter/pom.xml     |   1 +
 .../seatunnel-flink-13-starter/pom.xml             |  14 +-
 .../seatunnel-flink-15-starter/pom.xml             |   6 +
 .../core/starter/flink/args/FlinkCommandArgs.java  | 138 --------------
 .../flink/command/FlinkConfValidateCommand.java    |  44 -----
 .../flink/command/FlinkTaskExecuteCommand.java     |  66 -------
 .../FlinkAbstractPluginExecuteProcessor.java       |  91 ---------
 .../starter/flink/execution/FlinkExecution.java    | 207 ---------------------
 .../flink/execution/SourceExecuteProcessor.java    | 147 ---------------
 .../flink/execution/TransformExecuteProcessor.java | 138 --------------
 .../core/starter/flink/utils/ConfigKeyName.java    |  49 -----
 .../core/starter/flink/utils/EnvironmentUtil.java  | 120 ------------
 .../core/starter/flink/utils/TableUtil.java        |  50 -----
 .../pom.xml                                        |   7 +-
 .../seatunnel/core/starter/flink/FlinkStarter.java |   0
 .../core/starter/flink/SeaTunnelFlink.java         |   0
 .../core/starter/flink/args/FlinkCommandArgs.java  |   0
 .../flink/command/FlinkConfValidateCommand.java    |   0
 .../flink/command/FlinkTaskExecuteCommand.java     |   0
 .../FlinkAbstractPluginExecuteProcessor.java       |   0
 .../starter/flink/execution/FlinkExecution.java    |   0
 .../flink/execution/FlinkRuntimeEnvironment.java   |   0
 .../flink/execution/SinkExecuteProcessor.java      |   0
 .../flink/execution/SourceExecuteProcessor.java    |   0
 .../flink/execution/TransformExecuteProcessor.java |   0
 .../core/starter/flink/utils/ConfigKeyName.java    |   0
 .../core/starter/flink/utils/EnvironmentUtil.java  |   0
 .../core/starter/flink/utils/TableUtil.java        |   0
 seatunnel-core/seatunnel-spark-starter/pom.xml     |   1 +
 .../seatunnel-spark-2-starter/pom.xml              |  12 ++
 .../seatunnel-spark-3-starter/pom.xml              |  18 +-
 .../core/starter/spark/args/SparkCommandArgs.java  |  89 ---------
 .../spark/command/SparkConfValidateCommand.java    |  44 -----
 .../spark/command/SparkTaskExecuteCommand.java     |  66 -------
 .../spark/execution/SourceExecuteProcessor.java    | 111 -----------
 .../SparkAbstractPluginExecuteProcessor.java       |  78 --------
 .../starter/spark/execution/SparkExecution.java    |  81 --------
 .../spark/execution/SparkRuntimeEnvironment.java   | 179 ------------------
 .../spark/execution/TransformExecuteProcessor.java | 173 -----------------
 .../pom.xml                                        |   6 +-
 .../core/starter/spark/SeaTunnelSpark.java         |   0
 .../seatunnel/core/starter/spark/SparkStarter.java |   0
 .../core/starter/spark/args/SparkCommandArgs.java  |   0
 .../spark/command/SparkConfValidateCommand.java    |   0
 .../spark/command/SparkTaskExecuteCommand.java     |   0
 .../spark/execution/SinkExecuteProcessor.java      |   0
 .../spark/execution/SourceExecuteProcessor.java    |   0
 .../SparkAbstractPluginExecuteProcessor.java       |   0
 .../starter/spark/execution/SparkExecution.java    |   0
 .../spark/execution/SparkRuntimeEnvironment.java   |   0
 .../spark/execution/TransformExecuteProcessor.java |   0
 51 files changed, 54 insertions(+), 1882 deletions(-)

diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 969397088..3b4f5442c 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -35,6 +35,7 @@
     <modules>
         <module>seatunnel-flink-13-starter</module>
         <module>seatunnel-flink-15-starter</module>
+        <module>seatunnel-flink-starter-common</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
index c41e48bc1..3eb93c5c0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
@@ -31,6 +31,18 @@
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-starter-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <!-- flink-translation -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -66,7 +78,7 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
             <version>${flink.1.13.6.version}</version>
-            <scope>provided</scope>
+            <scope>${flink.scope}</scope>
         </dependency>
 
     </dependencies>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
index 97fb280c6..c92ae6c96 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
@@ -31,6 +31,12 @@
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-flink-starter-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-translation-flink-15</artifactId>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
deleted file mode 100644
index ff098b9df..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.args;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
-import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
-import org.apache.seatunnel.core.starter.enums.MasterType;
-import org.apache.seatunnel.core.starter.flink.command.FlinkConfValidateCommand;
-import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.Parameter;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@EqualsAndHashCode(callSuper = true)
-@Data
-public class FlinkCommandArgs extends AbstractCommandArgs {
-
-    @Parameter(
-            names = {"-e", "--deploy-mode"},
-            converter = FlinkDeployModeConverter.class,
-            description = "Flink job deploy mode, support [run, run-application]")
-    private DeployMode deployMode = DeployMode.RUN;
-
-    @Parameter(
-            names = {"--master", "--target"},
-            converter = FlinkMasterTargetConverter.class,
-            description =
-                    "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, "
-                            + "kubernetes-session, yarn-application, kubernetes-application]")
-    private MasterType masterType;
-
-    @Override
-    public Command<?> buildCommand() {
-        Common.setDeployMode(getDeployMode());
-        if (checkConfig) {
-            return new FlinkConfValidateCommand(this);
-        }
-        if (encrypt) {
-            return new ConfEncryptCommand(this);
-        }
-        if (decrypt) {
-            return new ConfDecryptCommand(this);
-        }
-        return new FlinkTaskExecuteCommand(this);
-    }
-
-    @Override
-    public String toString() {
-        return "FlinkCommandArgs{"
-                + "deployMode="
-                + deployMode
-                + ", masterType="
-                + masterType
-                + ", configFile='"
-                + configFile
-                + '\''
-                + ", variables="
-                + variables
-                + ", jobName='"
-                + jobName
-                + '\''
-                + ", originalParameters="
-                + originalParameters
-                + '}';
-    }
-
-    public static class FlinkMasterTargetConverter implements IStringConverter<MasterType> {
-        private static final List<MasterType> MASTER_TYPE_LIST = new ArrayList<>();
-
-        static {
-            MASTER_TYPE_LIST.add(MasterType.LOCAL);
-            MASTER_TYPE_LIST.add(MasterType.REMOTE);
-            MASTER_TYPE_LIST.add(MasterType.YARN_SESSION);
-            MASTER_TYPE_LIST.add(MasterType.YARN_PER_JOB);
-            MASTER_TYPE_LIST.add(MasterType.KUBERNETES_SESSION);
-            MASTER_TYPE_LIST.add(MasterType.YARN_APPLICATION);
-            MASTER_TYPE_LIST.add(MasterType.KUBERNETES_APPLICATION);
-        }
-
-        @Override
-        public MasterType convert(String value) {
-            MasterType masterType = MasterType.valueOf(value.toUpperCase().replaceAll("-", "_"));
-            if (MASTER_TYPE_LIST.contains(masterType)) {
-                return masterType;
-            } else {
-                throw new IllegalArgumentException(
-                        "SeaTunnel job on flink engine submitted target only "
-                                + "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, "
-                                + "yarn-application, kubernetes-application]");
-            }
-        }
-    }
-
-    public static class FlinkDeployModeConverter implements IStringConverter<DeployMode> {
-        private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
-
-        static {
-            DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN);
-            DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN_APPLICATION);
-        }
-
-        @Override
-        public DeployMode convert(String value) {
-            DeployMode deployMode = DeployMode.valueOf(value.toUpperCase().replaceAll("-", "_"));
-            if (DEPLOY_MODE_TYPE_LIST.contains(deployMode)) {
-                return deployMode;
-            } else {
-                throw new IllegalArgumentException(
-                        "SeaTunnel job on flink engine deploy mode only "
-                                + "support these options: [run, run-application]");
-            }
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
deleted file mode 100644
index 78921e698..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.command;
-
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-/** Use to validate the configuration of the SeaTunnel API. */
-@Slf4j
-public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
-
-    private final FlinkCommandArgs flinkCommandArgs;
-
-    public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
-        this.flinkCommandArgs = flinkCommandArgs;
-    }
-
-    @Override
-    public void execute() throws ConfigCheckException {
-        Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
-        // TODO: validate the config by new api
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
deleted file mode 100644
index f8539af75..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.command;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.execution.FlinkExecution;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
-
-@Slf4j
-public class FlinkTaskExecuteCommand implements Command<FlinkCommandArgs> {
-
-    private final FlinkCommandArgs flinkCommandArgs;
-
-    public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
-        this.flinkCommandArgs = flinkCommandArgs;
-    }
-
-    @Override
-    public void execute() throws CommandExecuteException {
-        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
-        checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile);
-        // if user specified job name using command line arguments, override config option
-        if (!flinkCommandArgs.getJobName().equals(Constants.LOGO)) {
-            config =
-                    config.withValue(
-                            ConfigUtil.joinPath("env", "job.name"),
-                            ConfigValueFactory.fromAnyRef(flinkCommandArgs.getJobName()));
-        }
-        FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
-        try {
-            seaTunnelTaskExecution.execute();
-        } catch (Exception e) {
-            throw new CommandExecuteException("Flink job executed failed", e);
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
deleted file mode 100644
index e9d36ba06..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.BiConsumer;
-
-public abstract class FlinkAbstractPluginExecuteProcessor<T>
-        implements PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> {
-    protected static final String ENGINE_TYPE = "seatunnel";
-    protected static final String PLUGIN_NAME = "plugin_name";
-    protected static final String SOURCE_TABLE_NAME = "source_table_name";
-
-    protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
-            (classLoader, url) -> {
-                if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
-                    URLClassLoader c =
-                            (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
-                    ReflectionUtils.invoke(c, "addURL", url);
-                } else if (classLoader instanceof URLClassLoader) {
-                    ReflectionUtils.invoke(classLoader, "addURL", url);
-                } else {
-                    throw new RuntimeException(
-                            "Unsupported classloader: " + classLoader.getClass().getName());
-                }
-            };
-
-    protected FlinkRuntimeEnvironment flinkRuntimeEnvironment;
-    protected final List<? extends Config> pluginConfigs;
-    protected JobContext jobContext;
-    protected final List<T> plugins;
-
-    protected FlinkAbstractPluginExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, JobContext jobContext) {
-        this.pluginConfigs = pluginConfigs;
-        this.jobContext = jobContext;
-        this.plugins = initializePlugins(jarPaths, pluginConfigs);
-    }
-
-    @Override
-    public void setRuntimeEnvironment(FlinkRuntimeEnvironment flinkRuntimeEnvironment) {
-        this.flinkRuntimeEnvironment = flinkRuntimeEnvironment;
-    }
-
-    protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
-        if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-            StreamTableEnvironment tableEnvironment =
-                    flinkRuntimeEnvironment.getStreamTableEnvironment();
-            Table table = tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME));
-            return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
-        }
-        return Optional.empty();
-    }
-
-    protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
-        flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream);
-    }
-
-    protected abstract List<T> initializePlugins(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs);
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
deleted file mode 100644
index a3282cc4a..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.execution.TaskExecution;
-import org.apache.seatunnel.core.starter.flink.FlinkStarter;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** Used to execute a SeaTunnelTask. */
-@Slf4j
-public class FlinkExecution implements TaskExecution {
-    private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
-    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
-            sourcePluginExecuteProcessor;
-    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
-            transformPluginExecuteProcessor;
-    private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
-            sinkPluginExecuteProcessor;
-    private final List<URL> jarPaths;
-
-    public FlinkExecution(Config config) {
-        try {
-            jarPaths =
-                    new ArrayList<>(
-                            Collections.singletonList(
-                                    new File(
-                                                    Common.appStarterDir()
-                                                            .resolve(FlinkStarter.APP_JAR_NAME)
-                                                            .toString())
-                                            .toURI()
-                                            .toURL()));
-        } catch (MalformedURLException e) {
-            throw new SeaTunnelException("load flink starter error.", e);
-        }
-        registerPlugin(config.getConfig("env"));
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
-
-        this.sourcePluginExecuteProcessor =
-                new SourceExecuteProcessor(
-                        jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
-        this.transformPluginExecuteProcessor =
-                new TransformExecuteProcessor(
-                        jarPaths,
-                        TypesafeConfigUtils.getConfigList(
-                                config, Constants.TRANSFORM, Collections.emptyList()),
-                        jobContext);
-        this.sinkPluginExecuteProcessor =
-                new SinkExecuteProcessor(
-                        jarPaths, config.getConfigList(Constants.SINK), jobContext);
-
-        this.flinkRuntimeEnvironment =
-                FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
-
-        this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
-        this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
-        this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
-    }
-
-    @Override
-    public void execute() throws TaskExecuteException {
-        List<DataStream<Row>> dataStreams = new ArrayList<>();
-        dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
-        dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
-        sinkPluginExecuteProcessor.execute(dataStreams);
-        log.info(
-                "Flink Execution Plan: {}",
-                flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
-        log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
-        try {
-            flinkRuntimeEnvironment
-                    .getStreamExecutionEnvironment()
-                    .execute(flinkRuntimeEnvironment.getJobName());
-        } catch (Exception e) {
-            throw new TaskExecuteException("Execute Flink job error", e);
-        }
-    }
-
-    private void registerPlugin(Config envConfig) {
-        List<Path> thirdPartyJars = new ArrayList<>();
-        if (envConfig.hasPath(EnvCommonOptions.JARS.key())) {
-            thirdPartyJars =
-                    new ArrayList<>(
-                            Common.getThirdPartyJars(
-                                    envConfig.getString(EnvCommonOptions.JARS.key())));
-        }
-        thirdPartyJars.addAll(Common.getPluginsJarDependencies());
-        List<URL> jarDependencies =
-                Stream.concat(thirdPartyJars.stream(), Common.getLibJars().stream())
-                        .map(Path::toUri)
-                        .map(
-                                uri -> {
-                                    try {
-                                        return uri.toURL();
-                                    } catch (MalformedURLException e) {
-                                        throw new RuntimeException(
-                                                "the uri of jar illegal:" + uri, e);
-                                    }
-                                })
-                        .collect(Collectors.toList());
-        jarDependencies.forEach(
-                url ->
-                        FlinkAbstractPluginExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(
-                                Thread.currentThread().getContextClassLoader(), url));
-        jarPaths.addAll(jarDependencies);
-    }
-
-    private Config registerPlugin(Config config, List<URL> jars) {
-        config =
-                this.injectJarsToConfig(
-                        config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
-        return this.injectJarsToConfig(
-                config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
-    }
-
-    private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
-        List<URL> validJars = new ArrayList<>();
-        for (URL jarUrl : jars) {
-            if (new File(jarUrl.getFile()).exists()) {
-                validJars.add(jarUrl);
-                log.info("Inject jar to config: {}", jarUrl);
-            } else {
-                log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
-            }
-        }
-
-        if (config.hasPath(path)) {
-            Set<URL> paths =
-                    Arrays.stream(config.getString(path).split(";"))
-                            .map(
-                                    uri -> {
-                                        try {
-                                            return new URL(uri);
-                                        } catch (MalformedURLException e) {
-                                            throw new RuntimeException(
-                                                    "the uri of jar illegal:" + uri, e);
-                                        }
-                                    })
-                            .collect(Collectors.toSet());
-            paths.addAll(validJars);
-
-            config =
-                    config.withValue(
-                            path,
-                            ConfigValueFactory.fromAnyRef(
-                                    paths.stream()
-                                            .map(URL::toString)
-                                            .distinct()
-                                            .collect(Collectors.joining(";"))));
-
-        } else {
-            config =
-                    config.withValue(
-                            path,
-                            ConfigValueFactory.fromAnyRef(
-                                    validJars.stream()
-                                            .map(URL::toString)
-                                            .distinct()
-                                            .collect(Collectors.joining(";"))));
-        }
-        return config;
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
deleted file mode 100644
index a3897a526..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SupportCoordinate;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelCoordinatedSource;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.types.Row;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
-    private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
-
-    public SourceExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> sourceConfigs, JobContext jobContext) {
-        super(jarPaths, sourceConfigs, jobContext);
-    }
-
-    @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
-        StreamExecutionEnvironment executionEnvironment =
-                flinkRuntimeEnvironment.getStreamExecutionEnvironment();
-        List<DataStream<Row>> sources = new ArrayList<>();
-        for (int i = 0; i < plugins.size(); i++) {
-            SeaTunnelSource internalSource = plugins.get(i);
-            BaseSeaTunnelSourceFunction sourceFunction;
-            if (internalSource instanceof SupportCoordinate) {
-                sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
-            } else {
-                sourceFunction = new SeaTunnelParallelSource(internalSource);
-            }
-            DataStreamSource<Row> sourceStream =
-                    addSource(
-                            executionEnvironment,
-                            sourceFunction,
-                            "SeaTunnel " + internalSource.getClass().getSimpleName(),
-                            internalSource.getBoundedness()
-                                    == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
-            Config pluginConfig = pluginConfigs.get(i);
-            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
-                int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
-                sourceStream.setParallelism(parallelism);
-            }
-            registerResultTable(pluginConfig, sourceStream);
-            sources.add(sourceStream);
-        }
-        return sources;
-    }
-
-    private DataStreamSource<Row> addSource(
-            StreamExecutionEnvironment streamEnv,
-            BaseSeaTunnelSourceFunction function,
-            String sourceName,
-            boolean bounded) {
-        checkNotNull(function);
-        checkNotNull(sourceName);
-        checkNotNull(bounded);
-
-        TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
-
-        boolean isParallel = function instanceof ParallelSourceFunction;
-
-        streamEnv.clean(function);
-
-        final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
-        return new DataStreamSource<>(
-                streamEnv,
-                resolvedTypeInfo,
-                sourceOperator,
-                isParallel,
-                sourceName,
-                bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
-    }
-
-    @Override
-    protected List<SeaTunnelSource> initializePlugins(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
-                new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
-        List<SeaTunnelSource> sources = new ArrayList<>();
-        Set<URL> jars = new HashSet<>();
-        for (Config sourceConfig : pluginConfigs) {
-            PluginIdentifier pluginIdentifier =
-                    PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
-            jars.addAll(
-                    sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource seaTunnelSource =
-                    sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
-            seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setJobContext(jobContext);
-            if (jobContext.getJobMode() == JobMode.BATCH
-                    && seaTunnelSource.getBoundedness()
-                            == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "'%s' source don't support off-line job.",
-                                seaTunnelSource.getPluginName()));
-            }
-            sources.add(seaTunnelSource);
-        }
-        jarPaths.addAll(jars);
-        return sources;
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
deleted file mode 100644
index a358fb6f3..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class TransformExecuteProcessor
-        extends FlinkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
-    private static final String PLUGIN_TYPE = "transform";
-
-    protected TransformExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, JobContext jobContext) {
-        super(jarPaths, pluginConfigs, jobContext);
-    }
-
-    @Override
-    protected List<SeaTunnelTransform> initializePlugins(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs) {
-        SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
-                new SeaTunnelTransformPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelTransform> transforms =
-                pluginConfigs.stream()
-                        .map(
-                                transformConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    transformConfig.getString(PLUGIN_NAME));
-                                    List<URL> pluginJarPaths =
-                                            transformPluginDiscovery.getPluginJarPaths(
-                                                    Lists.newArrayList(pluginIdentifier));
-                                    SeaTunnelTransform<?> seaTunnelTransform =
-                                            transformPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    jarPaths.addAll(pluginJarPaths);
-                                    seaTunnelTransform.prepare(transformConfig);
-                                    seaTunnelTransform.setJobContext(jobContext);
-                                    return seaTunnelTransform;
-                                })
-                        .distinct()
-                        .collect(Collectors.toList());
-        jarPaths.addAll(pluginJars);
-        return transforms;
-    }
-
-    @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
-            throws TaskExecuteException {
-        if (plugins.isEmpty()) {
-            return upstreamDataStreams;
-        }
-        DataStream<Row> input = upstreamDataStreams.get(0);
-        List<DataStream<Row>> result = new ArrayList<>();
-        for (int i = 0; i < plugins.size(); i++) {
-            try {
-                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
-                Config pluginConfig = pluginConfigs.get(i);
-                DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
-                input = flinkTransform(transform, stream);
-                registerResultTable(pluginConfig, input);
-                result.add(input);
-            } catch (Exception e) {
-                throw new TaskExecuteException(
-                        String.format(
-                                "SeaTunnel transform task: %s execute error",
-                                plugins.get(i).getPluginName()),
-                        e);
-            }
-        }
-        return result;
-    }
-
-    protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
-        SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
-        transform.setTypeInfo(seaTunnelDataType);
-        TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
-        FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
-        FlinkRowConverter transformOutputRowConverter =
-                new FlinkRowConverter(transform.getProducedType());
-        DataStream<Row> output =
-                stream.flatMap(
-                        new FlatMapFunction<Row, Row>() {
-                            @Override
-                            public void flatMap(Row value, Collector<Row> out) throws Exception {
-                                SeaTunnelRow seaTunnelRow =
-                                        transformInputRowConverter.reconvert(value);
-                                SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
-                                if (dataRow != null) {
-                                    Row copy = transformOutputRowConverter.convert(dataRow);
-                                    out.collect(copy);
-                                }
-                            }
-                        },
-                        rowTypeInfo);
-        return output;
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
deleted file mode 100644
index cc8229f26..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-public class ConfigKeyName {
-
-    private ConfigKeyName() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
-    public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
-    public static final String PARALLELISM = "execution.parallelism";
-    public static final String MAX_PARALLELISM = "execution.max-parallelism";
-    @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
-    public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
-    public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
-    public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
-    public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
-    public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
-    public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
-    public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
-    public static final String RESTART_STRATEGY = "execution.restart.strategy";
-    public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
-    public static final String RESTART_DELAY_BETWEEN_ATTEMPTS =
-            "execution.restart.delayBetweenAttempts";
-    public static final String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
-    public static final String RESTART_FAILURE_RATE = "execution.restart.failureRate";
-    public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
-    public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
-    public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
-    public static final String STATE_BACKEND = "execution.state.backend";
-    public static final String PLANNER = "execution.planner";
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
deleted file mode 100644
index f15638cbb..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.config.CheckResult;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-public final class EnvironmentUtil {
-
-    private EnvironmentUtil() {}
-
-    public static void setRestartStrategy(Config config, ExecutionConfig executionConfig) {
-        try {
-            if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) {
-                String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY);
-                switch (restartStrategy.toLowerCase()) {
-                    case "no":
-                        executionConfig.setRestartStrategy(RestartStrategies.noRestart());
-                        break;
-                    case "fixed-delay":
-                        int attempts = config.getInt(ConfigKeyName.RESTART_ATTEMPTS);
-                        long delay = config.getLong(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS);
-                        executionConfig.setRestartStrategy(
-                                RestartStrategies.fixedDelayRestart(attempts, delay));
-                        break;
-                    case "failure-rate":
-                        long failureInterval =
-                                config.getLong(ConfigKeyName.RESTART_FAILURE_INTERVAL);
-                        int rate = config.getInt(ConfigKeyName.RESTART_FAILURE_RATE);
-                        long delayInterval = config.getLong(ConfigKeyName.RESTART_DELAY_INTERVAL);
-                        executionConfig.setRestartStrategy(
-                                RestartStrategies.failureRateRestart(
-                                        rate,
-                                        Time.of(failureInterval, TimeUnit.MILLISECONDS),
-                                        Time.of(delayInterval, TimeUnit.MILLISECONDS)));
-                        break;
-                    default:
-                        log.warn(
-                                "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate",
-                                restartStrategy);
-                }
-            }
-        } catch (Exception e) {
-            log.warn("set restart.strategy in config '{}' exception", config, e);
-        }
-    }
-
-    public static CheckResult checkRestartStrategy(Config config) {
-        if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) {
-            String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY);
-            switch (restartStrategy.toLowerCase()) {
-                case "fixed-delay":
-                    if (!(config.hasPath(ConfigKeyName.RESTART_ATTEMPTS)
-                            && config.hasPath(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) {
-                        return CheckResult.error(
-                                String.format(
-                                        "fixed-delay restart strategy must set [%s],[%s]",
-                                        ConfigKeyName.RESTART_ATTEMPTS,
-                                        ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS));
-                    }
-                    break;
-                case "failure-rate":
-                    if (!(config.hasPath(ConfigKeyName.RESTART_FAILURE_INTERVAL)
-                            && config.hasPath(ConfigKeyName.RESTART_FAILURE_RATE)
-                            && config.hasPath(ConfigKeyName.RESTART_DELAY_INTERVAL))) {
-                        return CheckResult.error(
-                                String.format(
-                                        "failure-rate restart strategy must set [%s],[%s],[%s]",
-                                        ConfigKeyName.RESTART_FAILURE_INTERVAL,
-                                        ConfigKeyName.RESTART_FAILURE_RATE,
-                                        ConfigKeyName.RESTART_DELAY_INTERVAL));
-                    }
-                    break;
-                default:
-                    return CheckResult.success();
-            }
-        }
-        return CheckResult.success();
-    }
-
-    public static void initConfiguration(Config config, Configuration configuration) {
-        if (config.hasPath("pipeline")) {
-            Config pipeline = config.getConfig("pipeline");
-            if (pipeline.hasPath("jars")) {
-                configuration.setString(PipelineOptions.JARS.key(), pipeline.getString("jars"));
-            }
-            if (pipeline.hasPath("classpaths")) {
-                configuration.setString(
-                        PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
-            }
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
deleted file mode 100644
index ca1603cdf..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-public final class TableUtil {
-
-    private TableUtil() {}
-
-    public static DataStream<Row> tableToDataStream(
-            StreamTableEnvironment tableEnvironment, Table table, boolean isAppend) {
-
-        TypeInformation<Row> typeInfo = table.getSchema().toRowType();
-        if (isAppend) {
-            return tableEnvironment.toAppendStream(table, typeInfo);
-        }
-        return tableEnvironment
-                .toRetractStream(table, typeInfo)
-                .filter(row -> row.f0)
-                .map(row -> row.f1)
-                .returns(typeInfo);
-    }
-
-    public static boolean tableExists(TableEnvironment tableEnvironment, String name) {
-        return Arrays.asList(tableEnvironment.listTables()).contains(name);
-    }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
similarity index 93%
copy from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
index 97fb280c6..938c088a2 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
@@ -26,8 +26,10 @@
         <version>${revision}</version>
     </parent>
 
-    <artifactId>seatunnel-flink-15-starter</artifactId>
-    <name>SeaTunnel : Core : Flink Starter : 1.5</name>
+    <artifactId>seatunnel-flink-starter-common</artifactId>
+    <packaging>jar</packaging>
+
+    <name>SeaTunnel : Core : Flink Starter : Common</name>
 
     <dependencies>
 
@@ -35,6 +37,7 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-translation-flink-15</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
index 13f5ded0c..4502d72c9 100644
--- a/seatunnel-core/seatunnel-spark-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -33,6 +33,7 @@
     <modules>
         <module>seatunnel-spark-2-starter</module>
         <module>seatunnel-spark-3-starter</module>
+        <module>seatunnel-spark-starter-common</module>
     </modules>
 
     <properties>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
index e3773cff9..1dee3c5a3 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
@@ -58,6 +58,18 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-spark-starter-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
index 0e7fb1bb3..838723c7b 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
@@ -35,12 +35,6 @@
 
     <dependencies>
 
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-translation-spark-3.3</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_${scala.binary.version}</artifactId>
@@ -62,6 +56,18 @@
             <scope>${spark.scope}</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-spark-3.3</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-spark-starter-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
deleted file mode 100644
index 841383927..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.args;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
-import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
-import org.apache.seatunnel.core.starter.spark.command.SparkConfValidateCommand;
-import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.Parameter;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@EqualsAndHashCode(callSuper = true)
-@Data
-public class SparkCommandArgs extends AbstractCommandArgs {
-
-    @Parameter(
-            names = {"-e", "--deploy-mode"},
-            description = "Spark deploy mode, support [cluster, client]",
-            converter = SparkDeployModeConverter.class)
-    private DeployMode deployMode = DeployMode.CLIENT;
-
-    @Parameter(
-            names = {"-m", "--master"},
-            description =
-                    "Spark master, support [spark://host:port, mesos://host:port, yarn, "
-                            + "k8s://https://host:port, local], default local[*]")
-    private String master = "local[*]";
-
-    @Override
-    public Command<?> buildCommand() {
-        Common.setDeployMode(getDeployMode());
-        if (checkConfig) {
-            return new SparkConfValidateCommand(this);
-        }
-        if (encrypt) {
-            return new ConfEncryptCommand(this);
-        }
-        if (decrypt) {
-            return new ConfDecryptCommand(this);
-        }
-        return new SparkTaskExecuteCommand(this);
-    }
-
-    public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
-        private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
-
-        static {
-            DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLIENT);
-            DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLUSTER);
-        }
-
-        @Override
-        public DeployMode convert(String value) {
-            DeployMode deployMode = DeployMode.valueOf(value.toUpperCase());
-            if (DEPLOY_MODE_TYPE_LIST.contains(deployMode)) {
-                return deployMode;
-            } else {
-                throw new IllegalArgumentException(
-                        "SeaTunnel job on spark engine deploy mode only "
-                                + "support these options: [cluster, client]");
-            }
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
deleted file mode 100644
index 9da967970..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.command;
-
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-/** Use to validate the configuration of the SeaTunnel API. */
-@Slf4j
-public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
-
-    private final SparkCommandArgs sparkCommandArgs;
-
-    public SparkConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
-        this.sparkCommandArgs = sparkCommandArgs;
-    }
-
-    @Override
-    public void execute() throws ConfigCheckException {
-        Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
-        // TODO: validate the config by new api
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
deleted file mode 100644
index 463c1dc65..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.command;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.execution.SparkExecution;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
-
-@Slf4j
-public class SparkTaskExecuteCommand implements Command<SparkCommandArgs> {
-
-    private final SparkCommandArgs sparkCommandArgs;
-
-    public SparkTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
-        this.sparkCommandArgs = sparkCommandArgs;
-    }
-
-    @Override
-    public void execute() throws CommandExecuteException {
-        Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
-        checkConfigExist(configFile);
-        Config config = ConfigBuilder.of(configFile);
-        if (!sparkCommandArgs.getJobName().equals(Constants.LOGO)) {
-            config =
-                    config.withValue(
-                            ConfigUtil.joinPath("env", "job.name"),
-                            ConfigValueFactory.fromAnyRef(sparkCommandArgs.getJobName()));
-        }
-        try {
-            SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
-            seaTunnelTaskExecution.execute();
-        } catch (Exception e) {
-            log.error("Run SeaTunnel on spark failed.", e);
-            throw new CommandExecuteException(e.getMessage());
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
deleted file mode 100644
index d68aec3c2..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructType;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class SourceExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
-    private static final String PLUGIN_TYPE = "source";
-
-    public SourceExecuteProcessor(
-            SparkRuntimeEnvironment sparkEnvironment,
-            JobContext jobContext,
-            List<? extends Config> sourceConfigs) {
-        super(sparkEnvironment, jobContext, sourceConfigs);
-    }
-
-    @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
-        List<Dataset<Row>> sources = new ArrayList<>();
-        for (int i = 0; i < plugins.size(); i++) {
-            SeaTunnelSource<?, ?, ?> source = plugins.get(i);
-            Config pluginConfig = pluginConfigs.get(i);
-            int parallelism;
-            if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
-                parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
-            } else {
-                parallelism =
-                        sparkRuntimeEnvironment
-                                .getSparkConf()
-                                .getInt(
-                                        CommonOptions.PARALLELISM.key(),
-                                        CommonOptions.PARALLELISM.defaultValue());
-            }
-            Dataset<Row> dataset =
-                    sparkRuntimeEnvironment
-                            .getSparkSession()
-                            .read()
-                            .format(SeaTunnelSource.class.getSimpleName())
-                            .option(CommonOptions.PARALLELISM.key(), parallelism)
-                            .option(
-                                    Constants.SOURCE_SERIALIZATION,
-                                    SerializationUtils.objectToString(source))
-                            .schema(
-                                    (StructType)
-                                            TypeConverterUtils.convert(source.getProducedType()))
-                            .load();
-            sources.add(dataset);
-            registerInputTempView(pluginConfigs.get(i), dataset);
-        }
-        return sources;
-    }
-
-    @Override
-    protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(
-            List<? extends Config> pluginConfigs) {
-        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
-        List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
-        Set<URL> jars = new HashSet<>();
-        for (Config sourceConfig : pluginConfigs) {
-            PluginIdentifier pluginIdentifier =
-                    PluginIdentifier.of(
-                            ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
-            jars.addAll(
-                    sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
-            SeaTunnelSource<?, ?, ?> seaTunnelSource =
-                    sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
-            seaTunnelSource.prepare(sourceConfig);
-            seaTunnelSource.setJobContext(jobContext);
-            sources.add(seaTunnelSource);
-        }
-        sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
-        return sources;
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
deleted file mode 100644
index ebfcaf6e9..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import java.util.List;
-import java.util.Optional;
-
-public abstract class SparkAbstractPluginExecuteProcessor<T>
-        implements PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment> {
-    protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
-    protected final List<? extends Config> pluginConfigs;
-    protected final JobContext jobContext;
-    protected final List<T> plugins;
-    protected static final String ENGINE_TYPE = "seatunnel";
-    protected static final String PLUGIN_NAME = "plugin_name";
-    protected static final String RESULT_TABLE_NAME = "result_table_name";
-    protected static final String SOURCE_TABLE_NAME = "source_table_name";
-
-    protected SparkAbstractPluginExecuteProcessor(
-            SparkRuntimeEnvironment sparkRuntimeEnvironment,
-            JobContext jobContext,
-            List<? extends Config> pluginConfigs) {
-        this.sparkRuntimeEnvironment = sparkRuntimeEnvironment;
-        this.jobContext = jobContext;
-        this.pluginConfigs = pluginConfigs;
-        this.plugins = initializePlugins(pluginConfigs);
-    }
-
-    @Override
-    public void setRuntimeEnvironment(SparkRuntimeEnvironment sparkRuntimeEnvironment) {
-        this.sparkRuntimeEnvironment = sparkRuntimeEnvironment;
-    }
-
-    protected abstract List<T> initializePlugins(List<? extends Config> pluginConfigs);
-
-    protected void registerInputTempView(Config pluginConfig, Dataset<Row> dataStream) {
-        if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
-            String tableName = pluginConfig.getString(RESULT_TABLE_NAME);
-            registerTempView(tableName, dataStream);
-        }
-    }
-
-    protected Optional<Dataset<Row>> fromSourceTable(
-            Config pluginConfig, SparkRuntimeEnvironment sparkRuntimeEnvironment) {
-        if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
-            return Optional.empty();
-        }
-        String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
-        return Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName));
-    }
-
-    private void registerTempView(String tableName, Dataset<Row> ds) {
-        ds.createOrReplaceTempView(tableName);
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
deleted file mode 100644
index dab00cbdb..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.execution.TaskExecution;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-@Slf4j
-public class SparkExecution implements TaskExecution {
-    private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
-            sourcePluginExecuteProcessor;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
-            transformPluginExecuteProcessor;
-    private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
-            sinkPluginExecuteProcessor;
-
-    public SparkExecution(Config config) {
-        this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config);
-        JobContext jobContext = new JobContext();
-        jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
-        this.sourcePluginExecuteProcessor =
-                new SourceExecuteProcessor(
-                        sparkRuntimeEnvironment,
-                        jobContext,
-                        config.getConfigList(Constants.SOURCE));
-        this.transformPluginExecuteProcessor =
-                new TransformExecuteProcessor(
-                        sparkRuntimeEnvironment,
-                        jobContext,
-                        TypesafeConfigUtils.getConfigList(
-                                config, Constants.TRANSFORM, Collections.emptyList()));
-        this.sinkPluginExecuteProcessor =
-                new SinkExecuteProcessor(
-                        sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SINK));
-    }
-
-    @Override
-    public void execute() throws TaskExecuteException {
-        List<Dataset<Row>> datasets = new ArrayList<>();
-        datasets = sourcePluginExecuteProcessor.execute(datasets);
-        datasets = transformPluginExecuteProcessor.execute(datasets);
-        sinkPluginExecuteProcessor.execute(datasets);
-        log.info("Spark Execution started");
-    }
-
-    public SparkRuntimeEnvironment getSparkRuntimeEnvironment() {
-        return sparkRuntimeEnvironment;
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
deleted file mode 100644
index 7e31ca463..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.Seconds;
-import org.apache.spark.streaming.StreamingContext;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.net.URL;
-import java.util.List;
-
-@Slf4j
-public class SparkRuntimeEnvironment implements RuntimeEnvironment {
-    private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
-    private static final String PLUGIN_NAME_KEY = "plugin_name";
-    private static volatile SparkRuntimeEnvironment INSTANCE = null;
-
-    private SparkConf sparkConf;
-
-    private SparkSession sparkSession;
-
-    private StreamingContext streamingContext;
-
-    private Config config;
-
-    private boolean enableHive = false;
-
-    private JobMode jobMode;
-
-    private String jobName = Constants.LOGO;
-
-    private SparkRuntimeEnvironment(Config config) {
-        this.setEnableHive(checkIsContainHive(config));
-        this.initialize(config);
-    }
-
-    public void setEnableHive(boolean enableHive) {
-        this.enableHive = enableHive;
-    }
-
-    @Override
-    public RuntimeEnvironment setConfig(Config config) {
-        this.config = config;
-        return this;
-    }
-
-    @Override
-    public RuntimeEnvironment setJobMode(JobMode mode) {
-        this.jobMode = mode;
-        return this;
-    }
-
-    @Override
-    public JobMode getJobMode() {
-        return jobMode;
-    }
-
-    @Override
-    public Config getConfig() {
-        return this.config;
-    }
-
-    @Override
-    public CheckResult checkConfig() {
-        return CheckResult.success();
-    }
-
-    @Override
-    public void registerPlugin(List<URL> pluginPaths) {
-        log.info("register plugins :" + pluginPaths);
-        // TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor
-        // it to
-        //  support submit multi-jar in code or remove this logic.
-        // this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(",")));
-    }
-
-    @Override
-    public SparkRuntimeEnvironment prepare() {
-        if (config.hasPath("job.name")) {
-            this.jobName = config.getString("job.name");
-        }
-        sparkConf = createSparkConf();
-        SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
-        if (enableHive) {
-            builder.enableHiveSupport();
-        }
-        this.sparkSession = builder.getOrCreate();
-        createStreamingContext();
-        return this;
-    }
-
-    public SparkSession getSparkSession() {
-        return this.sparkSession;
-    }
-
-    public StreamingContext getStreamingContext() {
-        return this.streamingContext;
-    }
-
-    public SparkConf getSparkConf() {
-        return this.sparkConf;
-    }
-
-    private SparkConf createSparkConf() {
-        SparkConf sparkConf = new SparkConf();
-        this.config
-                .entrySet()
-                .forEach(
-                        entry ->
-                                sparkConf.set(
-                                        entry.getKey(),
-                                        String.valueOf(entry.getValue().unwrapped())));
-        sparkConf.setAppName(jobName);
-        return sparkConf;
-    }
-
-    private void createStreamingContext() {
-        SparkConf conf = this.sparkSession.sparkContext().getConf();
-        long duration =
-                conf.getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION);
-        if (this.streamingContext == null) {
-            this.streamingContext =
-                    new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
-        }
-    }
-
-    protected boolean checkIsContainHive(Config config) {
-        List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
-        for (Config c : sourceConfigList) {
-            if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
-                return true;
-            }
-        }
-        List<? extends Config> sinkConfigList = config.getConfigList(PluginType.SINK.getType());
-        for (Config c : sinkConfigList) {
-            if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static SparkRuntimeEnvironment getInstance(Config config) {
-        if (INSTANCE == null) {
-            synchronized (SparkRuntimeEnvironment.class) {
-                if (INSTANCE == null) {
-                    INSTANCE = new SparkRuntimeEnvironment(config);
-                }
-            }
-        }
-        return INSTANCE;
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
deleted file mode 100644
index f82c465ec..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-
-import org.apache.spark.api.java.function.MapPartitionsFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
-import org.apache.spark.sql.types.StructType;
-
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class TransformExecuteProcessor
-        extends SparkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
-    private static final String PLUGIN_TYPE = "transform";
-
-    protected TransformExecuteProcessor(
-            SparkRuntimeEnvironment sparkRuntimeEnvironment,
-            JobContext jobContext,
-            List<? extends Config> pluginConfigs) {
-        super(sparkRuntimeEnvironment, jobContext, pluginConfigs);
-    }
-
-    @Override
-    protected List<SeaTunnelTransform> initializePlugins(List<? extends Config> pluginConfigs) {
-        SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
-                new SeaTunnelTransformPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
-        List<SeaTunnelTransform> transforms =
-                pluginConfigs.stream()
-                        .map(
-                                transformConfig -> {
-                                    PluginIdentifier pluginIdentifier =
-                                            PluginIdentifier.of(
-                                                    ENGINE_TYPE,
-                                                    PLUGIN_TYPE,
-                                                    transformConfig.getString(PLUGIN_NAME));
-                                    pluginJars.addAll(
-                                            transformPluginDiscovery.getPluginJarPaths(
-                                                    Lists.newArrayList(pluginIdentifier)));
-                                    SeaTunnelTransform pluginInstance =
-                                            transformPluginDiscovery.createPluginInstance(
-                                                    pluginIdentifier);
-                                    pluginInstance.prepare(transformConfig);
-                                    pluginInstance.setJobContext(jobContext);
-                                    return pluginInstance;
-                                })
-                        .distinct()
-                        .collect(Collectors.toList());
-        sparkRuntimeEnvironment.registerPlugin(pluginJars);
-        return transforms;
-    }
-
-    @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
-            throws TaskExecuteException {
-        if (plugins.isEmpty()) {
-            return upstreamDataStreams;
-        }
-        Dataset<Row> input = upstreamDataStreams.get(0);
-        List<Dataset<Row>> result = new ArrayList<>();
-        for (int i = 0; i < plugins.size(); i++) {
-            try {
-                SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
-                Config pluginConfig = pluginConfigs.get(i);
-                Dataset<Row> stream =
-                        fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input);
-                input = sparkTransform(transform, stream);
-                registerInputTempView(pluginConfig, input);
-                result.add(input);
-            } catch (Exception e) {
-                throw new TaskExecuteException(
-                        String.format(
-                                "SeaTunnel transform task: %s execute error",
-                                plugins.get(i).getPluginName()),
-                        e);
-            }
-        }
-        return result;
-    }
-
-    private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream)
-            throws IOException {
-        SeaTunnelDataType<?> seaTunnelDataType = TypeConverterUtils.convert(stream.schema());
-        transform.setTypeInfo(seaTunnelDataType);
-        StructType structType =
-                (StructType) TypeConverterUtils.convert(transform.getProducedType());
-        ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
-        return stream.mapPartitions(
-                        (MapPartitionsFunction<Row, Row>)
-                                (Iterator<Row> rowIterator) -> {
-                                    TransformIterator iterator =
-                                            new TransformIterator(
-                                                    rowIterator, transform, structType);
-                                    return iterator;
-                                },
-                        encoder)
-                .filter(
-                        (Row row) -> {
-                            return row != null;
-                        });
-    }
-
-    private static class TransformIterator implements Iterator<Row>, Serializable {
-        private Iterator<Row> sourceIterator;
-        private SeaTunnelTransform<SeaTunnelRow> transform;
-        private StructType structType;
-
-        public TransformIterator(
-                Iterator<Row> sourceIterator,
-                SeaTunnelTransform<SeaTunnelRow> transform,
-                StructType structType) {
-            this.sourceIterator = sourceIterator;
-            this.transform = transform;
-            this.structType = structType;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return sourceIterator.hasNext();
-        }
-
-        @Override
-        public Row next() {
-            Row row = sourceIterator.next();
-            SeaTunnelRow seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema) row).values());
-            seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
-            if (seaTunnelRow == null) {
-                return null;
-            }
-            return new GenericRowWithSchema(seaTunnelRow.getFields(), structType);
-        }
-    }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
similarity index 92%
copy from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
copy to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
index 0e7fb1bb3..ec4df4bfa 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
@@ -26,8 +26,9 @@
         <version>${revision}</version>
     </parent>
 
-    <artifactId>seatunnel-spark-3-starter</artifactId>
-    <name>SeaTunnel : Core : Spark Starter : 3.3</name>
+    <artifactId>seatunnel-spark-starter-common</artifactId>
+    <packaging>jar</packaging>
+    <name>SeaTunnel : Core : Spark Starter : Common</name>
 
     <properties>
         <scala.binary.version>2.12</scala.binary.version>
@@ -39,6 +40,7 @@
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-translation-spark-3.3</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java