You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/09/09 08:00:54 UTC

[incubator-seatunnel] branch dev updated: [Improve][e2e] Container only copy required connector jars (#2675)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5039752ea [Improve][e2e] Container only copy required connector jars (#2675)
5039752ea is described below

commit 5039752eacc8702477b77e5b9b0946ad87125f9b
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri Sep 9 16:00:48 2022 +0800

    [Improve][e2e] Container only copy required connector jars (#2675)
    
    * [Improve][e2e] flink container only copy required connector jars
    
    * rename flink-e2e-common
    
    * remove useless imported
    
    * [Improve][e2e] flink sql container refactoring
    
    * remove useless imported
    
    * remove useless
    
    * [Improve][e2e] spark container only copy required connector jars
    
    * change for code review
    
    * Use e2e-common module directly
    
    * checkstyle
    
    * code format
---
 .../api/configuration/ReadonlyConfig.java          |  12 +-
 seatunnel-e2e/pom.xml                              |   8 +-
 .../pom.xml                                        |  14 +-
 .../seatunnel/e2e/common/AbstractContainer.java    | 103 +++++++++++++
 .../e2e/common/AbstractFlinkContainer.java         | 121 +++++++++++++++
 .../e2e/common/AbstractSparkContainer.java         |  87 +++++++++++
 .../apache/seatunnel/e2e/common/ContainerUtil.java | 165 +++++++++++++++++++++
 .../connector-flink-e2e-base/pom.xml               |  11 +-
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 150 +++----------------
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   2 +-
 .../seatunnel-connector-flink-e2e-base/pom.xml     |  12 +-
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 149 +++----------------
 .../setunnel-connector-flink-sql-e2e-base/pom.xml  |  11 +-
 .../seatunnel/e2e/flink/sql/FlinkContainer.java    | 129 +++-------------
 .../e2e/flink/sql/fake/DatagenToConsoleIT.java     |   5 +-
 .../connector-spark-e2e-base/pom.xml               |  11 +-
 .../apache/seatunnel/e2e/spark/SparkContainer.java | 143 +++---------------
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   2 +-
 .../seatunnel-connector-spark-e2e-base/pom.xml     |  11 +-
 .../apache/seatunnel/e2e/spark/SparkContainer.java | 139 +++--------------
 20 files changed, 633 insertions(+), 652 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
index beb9d08a9..32e8db273 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java
@@ -66,18 +66,24 @@ public class ReadonlyConfig {
         return getOptional(option).orElseGet(option::defaultValue);
     }
 
-    @SuppressWarnings("MagicNumber")
     public Map<String, String> toMap() {
         if (confData.isEmpty()) {
             return Collections.emptyMap();
         }
 
+        Map<String, String> result = new HashMap<>();
+        toMap(result);
+        return result;
+    }
+
+    public void toMap(Map<String, String> result) {
+        if (confData.isEmpty()) {
+            return;
+        }
         Map<String, Object> flatteningMap = flatteningMap(confData);
-        Map<String, String> result = new HashMap<>((flatteningMap.size() << 2) / 3 + 1);
         for (Map.Entry<String, Object> entry : flatteningMap.entrySet()) {
             result.put(entry.getKey(), convertToJsonString(entry.getValue()));
         }
-        return result;
     }
 
     @SuppressWarnings("unchecked")
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index c9b0b975a..c8bb6374e 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -27,15 +27,17 @@
     <packaging>pom</packaging>
 
     <modules>
-        <module>seatunnel-flink-e2e</module>
-        <module>seatunnel-spark-e2e</module>
+        <module>seatunnel-e2e-common</module>
         <module>seatunnel-flink-connector-v2-e2e</module>
-        <module>seatunnel-spark-connector-v2-e2e</module>
+        <module>seatunnel-flink-e2e</module>
         <module>seatunnel-flink-sql-e2e</module>
+        <module>seatunnel-spark-connector-v2-e2e</module>
+        <module>seatunnel-spark-e2e</module>
     </modules>
 
     <properties>
         <junit4.version>4.13.2</junit4.version>
+        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
     </properties>
     <dependencies>
         <dependency>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-e2e-common/pom.xml
similarity index 84%
copy from seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
copy to seatunnel-e2e/seatunnel-e2e-common/pom.xml
index 2cd8cacdb..3ddc46cb5 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-e2e-common/pom.xml
@@ -17,17 +17,21 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-flink-sql-e2e</artifactId>
+        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>setunnel-connector-flink-sql-e2e-base</artifactId>
+    <artifactId>seatunnel-e2e-common</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
new file mode 100644
index 000000000..f4fb9d116
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import static org.apache.seatunnel.e2e.common.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.adaptPathForWin;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConfigFileToContainer;
+import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConnectorJarToContainer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractContainer {
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractContainer.class);
+    protected static final String START_ROOT_MODULE_NAME = "seatunnel-core";
+
+    protected final String startModuleName;
+
+    protected final String startModuleFullPath;
+
+    public AbstractContainer() {
+        String[] modules = getStartModulePath().split(File.separator);
+        this.startModuleName = modules[modules.length - 1];
+        this.startModuleFullPath = PROJECT_ROOT_PATH + File.separator +
+            START_ROOT_MODULE_NAME + File.separator + getStartModulePath();
+    }
+
+    protected abstract String getDockerImage();
+
+    protected abstract String getStartModulePath();
+
+    protected abstract String getStartShellName();
+
+    protected abstract String getConnectorModulePath();
+
+    protected abstract String getConnectorType();
+
+    protected abstract String getConnectorNamePrefix();
+
+    protected abstract String getSeaTunnelHomeInContainer();
+
+    protected abstract List<String> getExtraStartShellCommands();
+
+    protected void copySeaTunnelStarter(GenericContainer<?> container) {
+        String[] modules = getStartModulePath().split(File.separator);
+        final String startModuleName = modules[modules.length - 1];
+        ContainerUtil.copySeaTunnelStarter(container,
+            startModuleName,
+            PROJECT_ROOT_PATH + File.separator + START_ROOT_MODULE_NAME + File.separator + getStartModulePath(),
+            getSeaTunnelHomeInContainer(),
+            getStartShellName());
+    }
+
+    protected Container.ExecResult executeJob(GenericContainer<?> container, String confFile) throws IOException, InterruptedException {
+        final String confInContainerPath = copyConfigFileToContainer(container, confFile);
+        // copy connectors
+        copyConnectorJarToContainer(container,
+            confFile,
+            getConnectorModulePath(),
+            getConnectorNamePrefix(),
+            getConnectorType(),
+            getSeaTunnelHomeInContainer());
+        return executeCommand(container, confInContainerPath);
+    }
+
+    protected Container.ExecResult executeCommand(GenericContainer<?> container, String configPath) throws IOException, InterruptedException {
+        final List<String> command = new ArrayList<>();
+        String binPath = Paths.get(getSeaTunnelHomeInContainer(), "bin", getStartShellName()).toString();
+        // base command
+        command.add(adaptPathForWin(binPath));
+        command.add("--config");
+        command.add(adaptPathForWin(configPath));
+        command.addAll(getExtraStartShellCommands());
+
+        Container.ExecResult execResult = container.execInContainer("bash", "-c", String.join(" ", command));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        return execResult;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java
new file mode 100644
index 000000000..1f06648eb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close the Flink cluster.
+ * You can use {@link AbstractFlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
+ */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractFlinkContainer extends AbstractContainer {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkContainer.class);
+
+    protected static final String FLINK_SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+
+    protected static final Network NETWORK = Network.newNetwork();
+
+    protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList(
+        "jobmanager.rpc.address: jobmanager",
+        "taskmanager.numberOfTaskSlots: 10",
+        "parallelism.default: 4",
+        "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false");
+
+    protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    protected String getSeaTunnelHomeInContainer() {
+        return FLINK_SEATUNNEL_HOME;
+    }
+
+    @BeforeAll
+    public void before() {
+        final String dockerImage = getDockerImage();
+        final String properties = String.join("\n", getFlinkProperties());
+        jobManager = new GenericContainer<>(dockerImage)
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", properties)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(dockerImage)
+                .withCommand("taskmanager")
+                .withNetwork(NETWORK)
+                .withNetworkAliases("taskmanager")
+                .withEnv("FLINK_PROPERTIES", properties)
+                .dependsOn(jobManager)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        copySeaTunnelStarter(jobManager);
+        LOG.info("Flink containers are started.");
+    }
+
+    protected List<String> getFlinkProperties() {
+        return DEFAULT_FLINK_PROPERTIES;
+    }
+
+    @AfterAll
+    public void close() {
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+    }
+
+    @Override
+    protected List<String> getExtraStartShellCommands() {
+        return Collections.emptyList();
+    }
+
+    public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
+        return executeJob(jobManager, confFile);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
new file mode 100644
index 000000000..fc16846c1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class AbstractSparkContainer extends AbstractContainer {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkContainer.class);
+
+    private static final String SPARK_SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+    private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.3";
+    public static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> master;
+
+    @Override
+    protected String getDockerImage() {
+        return DEFAULT_DOCKER_IMAGE;
+    }
+
+    @Override
+    protected String getSeaTunnelHomeInContainer() {
+        return SPARK_SEATUNNEL_HOME;
+    }
+
+    @BeforeAll
+    public void before() {
+        master = new GenericContainer<>(getDockerImage())
+            .withNetwork(NETWORK)
+            .withNetworkAliases("spark-master")
+            .withExposedPorts()
+            .withEnv("SPARK_MODE", "master")
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+        // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
+        // start a worker.
+        Startables.deepStart(Stream.of(master)).join();
+        copySeaTunnelStarter(master);
+        LOG.info("Spark container started");
+    }
+
+    @AfterAll
+    public void close() {
+        if (master != null) {
+            master.stop();
+        }
+    }
+
+    @Override
+    protected List<String> getExtraStartShellCommands() {
+        return Arrays.asList("--master local",
+            "--deploy-mode client");
+    }
+
+    public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
+        return executeJob(master, confFile);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java
new file mode 100644
index 000000000..237e30f09
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.common;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public final class ContainerUtil {
+
+    public static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+
+    /**
+     * An error occurs when the user is not a submodule of seatunnel-e2e.
+     */
+    public static final String PROJECT_ROOT_PATH = System.getProperty("user.dir").split("/seatunnel-e2e/")[0];
+
+    public static void copyConnectorJarToContainer(GenericContainer<?> container,
+                                                   String confFile,
+                                                   String connectorsRootPath,
+                                                   String connectorPrefix,
+                                                   String connectorType,
+                                                   String seatunnelHome) {
+        Config jobConfig = getConfig(getConfigFile(confFile));
+        Config connectorsMapping = getConfig(new File(PROJECT_ROOT_PATH + File.separator + PLUGIN_MAPPING_FILE));
+        if (!connectorsMapping.hasPath(connectorType) || connectorsMapping.getConfig(connectorType).isEmpty()) {
+            return;
+        }
+        Config connectors = connectorsMapping.getConfig(connectorType);
+        Set<String> connectorNames = getConnectors(jobConfig, connectors, "source");
+        connectorNames.addAll(getConnectors(jobConfig, connectors, "sink"));
+        File module = new File(PROJECT_ROOT_PATH + File.separator + connectorsRootPath);
+
+        List<File> connectorFiles = getConnectorFiles(module, connectorNames, connectorPrefix);
+        connectorFiles.forEach(jar ->
+            container.copyFileToContainer(
+                MountableFile.forHostPath(jar.getAbsolutePath()),
+                Paths.get(Paths.get(seatunnelHome, "connectors").toString(), connectorType, jar.getName()).toString()));
+    }
+
+    public static String copyConfigFileToContainer(GenericContainer<?> container, String confFile) {
+        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+        container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()), targetConfInContainer);
+        return targetConfInContainer;
+    }
+
+    public static void copySeaTunnelStarter(GenericContainer<?> container,
+                                            String startModuleName,
+                                            String startModulePath,
+                                            String seatunnelHomeInContainer,
+                                            String startShellName) {
+        final String startJarName = startModuleName + ".jar";
+        // copy lib
+        final String startJarPath = startModulePath + File.separator + "target" + File.separator + startJarName;
+        container.copyFileToContainer(
+            MountableFile.forHostPath(startJarPath),
+            Paths.get(Paths.get(seatunnelHomeInContainer, "lib").toString(), startJarName).toString());
+
+        // copy bin
+        final String startBinPath = startModulePath + File.separator + "/src/main/bin/" + startShellName;
+        container.copyFileToContainer(
+            MountableFile.forHostPath(startBinPath),
+            Paths.get(Paths.get(seatunnelHomeInContainer, "bin").toString(), startShellName).toString());
+
+        // copy plugin-mapping.properties
+        container.copyFileToContainer(
+            MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
+            Paths.get(Paths.get(seatunnelHomeInContainer, "connectors").toString(), PLUGIN_MAPPING_FILE).toString());
+    }
+
+    public static String adaptPathForWin(String path) {
+        // Running IT use cases under Windows requires replacing \ with /
+        return path == null ? "" : path.replaceAll("\\\\", "/");
+    }
+
+    private static List<File> getConnectorFiles(File currentModule, Set<String> connectorNames, String connectorPrefix) {
+        List<File> connectorFiles = new ArrayList<>();
+        for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+            getConnectorFiles(file, connectorNames, connectorPrefix, connectorFiles);
+        }
+        return connectorFiles;
+    }
+
+    private static void getConnectorFiles(File currentModule, Set<String> connectorNames, String connectorPrefix, List<File> connectors) {
+        if (currentModule.isFile() || connectorNames.size() == connectors.size()) {
+            return;
+        }
+        if (connectorNames.contains(currentModule.getName())) {
+            File targetPath = new File(currentModule.getAbsolutePath() + File.separator + "target");
+            for (File file : Objects.requireNonNull(targetPath.listFiles())) {
+                if (file.getName().startsWith(currentModule.getName()) && !file.getName().endsWith("javadoc.jar")) {
+                    connectors.add(file);
+                    return;
+                }
+            }
+        }
+
+        if (currentModule.getName().startsWith(connectorPrefix)) {
+            for (File file : Objects.requireNonNull(currentModule.listFiles())) {
+                getConnectorFiles(file, connectorNames, connectorPrefix, connectors);
+            }
+        }
+    }
+
+    private static Set<String> getConnectors(Config jobConfig, Config connectorsMap, String pluginType) {
+        List<? extends Config> connectorConfigList = jobConfig.getConfigList(pluginType);
+        Map<String, String> connectors = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        ReadonlyConfig.fromConfig(connectorsMap.getConfig(pluginType)).toMap(connectors);
+        return connectorConfigList.stream()
+            .map(config -> config.getString("plugin_name"))
+            .filter(connectors::containsKey)
+            .map(connectors::get)
+            .collect(Collectors.toSet());
+    }
+
+    public static Path getCurrentModulePath() {
+        return Paths.get(System.getProperty("user.dir"));
+    }
+
+    private static File getConfigFile(String confFile) {
+        File file = new File(getCurrentModulePath() + "/src/test/resources" + confFile);
+        if (file.exists()) {
+            return file;
+        }
+        throw new IllegalArgumentException(confFile + " doesn't exist");
+    }
+
+    private static Config getConfig(File file) {
+        return ConfigFactory
+            .parseFile(file)
+            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+            .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
index 57a12fa6c..d2f48c420 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml
@@ -25,9 +25,14 @@
 
     <artifactId>connector-flink-e2e-base</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 4fe612a09..6ecaa77cb 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -17,152 +17,42 @@
 
 package org.apache.seatunnel.e2e.flink;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
 
 /**
  * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
  * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
  */
-public abstract class FlinkContainer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
-    private static final String FLINK_DOCKER_IMAGE = "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
-    protected static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh";
-    private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar";
-    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
-    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
-    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
-    private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
-    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
-    private static final String FLINK_PROPERTIES = String.join(
-        "\n",
-        Arrays.asList(
-            "jobmanager.rpc.address: jobmanager",
-            "taskmanager.numberOfTaskSlots: 10",
-            "parallelism.default: 4",
-            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
-
-    @BeforeEach
-    public void before() {
-        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-            .withCommand("jobmanager")
-            .withNetwork(NETWORK)
-            .withNetworkAliases("jobmanager")
-            .withExposedPorts()
-            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
 
-        taskManager =
-            new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("taskmanager")
-                .withNetwork(NETWORK)
-                .withNetworkAliases("taskmanager")
-                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                .dependsOn(jobManager)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-        Startables.deepStart(Stream.of(jobManager)).join();
-        Startables.deepStart(Stream.of(taskManager)).join();
-        copySeaTunnelFlinkFile();
-        LOG.info("Flink containers are started.");
+    @Override
+    protected String getDockerImage() {
+        return "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27";
     }
 
-    @AfterEach
-    public void close() {
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
+    @Override
+    protected String getStartModulePath() {
+        return "seatunnel-flink-starter";
     }
 
-    public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
-        final String confPath = getResource(confFile);
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        String binPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString().replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(binPath);
-        command.add("--config " + conf);
-
-        Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
-        return execResult;
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-flink-new-connector.sh";
     }
 
-    protected void copySeaTunnelFlinkFile() {
-        // copy lib
-        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
-            + "/seatunnel-core/seatunnel-flink-starter/target/" + SEATUNNEL_FLINK_JAR;
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
-            Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-flink-starter/src/main/bin/" + SEATUNNEL_FLINK_BIN;
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
-
-        // copy connectors
-        File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors-v2-dist/target/lib");
-        Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("connector-"))))
-            .forEach(jar ->
-                jobManager.copyFileToContainer(
-                    MountableFile.forHostPath(jar.getAbsolutePath()),
-                    getConnectorPath(jar.getName())));
-
-        // copy plugin-mapping.properties
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
-            Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
     }
 
-    private String getResource(String confFile) {
-        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
     }
 
-    private String getConnectorPath(String fileName) {
-        return Paths.get(SEATUNNEL_CONNECTORS, "seatunnel", fileName).toString();
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 4f5f037dd..045579f37 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -41,7 +41,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-flink</artifactId>
+            <artifactId>seatunnel-flink-starter</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
index 32f26486d..7968e8526 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml
@@ -25,10 +25,14 @@
 
     <artifactId>seatunnel-connector-flink-e2e-base</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
-
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
     <build>
         <plugins>
             <plugin>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index f3308dead..a1ec70269 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -17,152 +17,37 @@
 
 package org.apache.seatunnel.e2e.flink;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
 
 /**
  * This class is the base class of FlinkEnvironment test.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
  * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
  */
-public abstract class FlinkContainer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
-    private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
-    protected static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink.sh";
-    private static final String SEATUNNEL_FLINK_JAR = "seatunnel-core-flink.jar";
-    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
-    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
-    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
-    private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
-    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
-    private static final String FLINK_PROPERTIES = String.join(
-        "\n",
-        Arrays.asList(
-            "jobmanager.rpc.address: jobmanager",
-            "taskmanager.numberOfTaskSlots: 10",
-            "parallelism.default: 4",
-            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
-
-    @BeforeEach
-    public void before() {
-        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("jobmanager")
-                .withNetwork(NETWORK)
-                .withNetworkAliases("jobmanager")
-                .withExposedPorts()
-                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-        taskManager =
-                new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                        .withCommand("taskmanager")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases("taskmanager")
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                        .dependsOn(jobManager)
-                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
 
-        Startables.deepStart(Stream.of(jobManager)).join();
-        Startables.deepStart(Stream.of(taskManager)).join();
-        copySeaTunnelFlinkFile();
-        LOG.info("Flink containers are started.");
+    @Override
+    protected String getStartModulePath() {
+        return "seatunnel-core-flink";
     }
 
-    @AfterEach
-    public void close() {
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-flink.sh";
     }
 
-    public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException {
-        final String confPath = getResource(confFile);
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-flink.sh").toString());
-        command.add("--config " + conf);
-
-        Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
-        return execResult;
-    }
-
-    protected void copySeaTunnelFlinkFile() {
-        // copy lib
-        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
-            + "/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
-            Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh";
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
-
-        // copy connectors
-        File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
-        Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-flink"))))
-            .forEach(jar ->
-                jobManager.copyFileToContainer(
-                    MountableFile.forHostPath(jar.getAbsolutePath()),
-                    getConnectorPath(jar.getName())));
-
-        // copy plugin-mapping.properties
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
-            Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
     }
 
-    private String getResource(String confFile) {
-        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors/seatunnel-connectors-flink";
     }
 
-    private String getConnectorPath(String fileName) {
-        return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString();
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "seatunnel-connector-flink-";
     }
-
 }
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
index 2cd8cacdb..5de92cec7 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml
@@ -25,9 +25,14 @@
 
     <artifactId>setunnel-connector-flink-sql-e2e-base</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
index 082500856..82a7159c1 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -17,128 +17,37 @@
 
 package org.apache.seatunnel.e2e.flink.sql;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractFlinkContainer;
 
 /**
  * This class is the base class of FlinkEnvironment test.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
- * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job.
+ * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob(String)} to submit a seatunnel config and run a seatunnel job.
  */
-public abstract class FlinkContainer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class);
-
-    private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
-    protected static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> jobManager;
-    protected GenericContainer<?> taskManager;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
-    private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar";
-    private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString();
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();
-
-    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
-
-    private static final String FLINK_PROPERTIES = String.join(
-        "\n",
-        Arrays.asList(
-            "jobmanager.rpc.address: jobmanager",
-            "taskmanager.numberOfTaskSlots: 10",
-            "parallelism.default: 4",
-            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+public abstract class FlinkContainer extends AbstractFlinkContainer {
 
-    @BeforeEach
-    public void before() {
-        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-            .withCommand("jobmanager")
-            .withNetwork(NETWORK)
-            .withNetworkAliases("jobmanager")
-            .withExposedPorts()
-            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-        taskManager =
-            new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("taskmanager")
-                .withNetwork(NETWORK)
-                .withNetworkAliases("taskmanager")
-                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                .dependsOn(jobManager)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-        Startables.deepStart(Stream.of(jobManager)).join();
-        Startables.deepStart(Stream.of(taskManager)).join();
-        copySeaTunnelFlinkFile();
-        LOG.info("Flink containers are started.");
+    @Override
+    protected String getStartModulePath() {
+        return "seatunnel-core-flink-sql";
     }
 
-    @AfterEach
-    public void close() {
-        if (taskManager != null) {
-            taskManager.stop();
-        }
-        if (jobManager != null) {
-            jobManager.stop();
-        }
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-sql.sh";
     }
 
-    public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
-        throws IOException, InterruptedException, URISyntaxException {
-        final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString();
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-sql.sh").toString());
-        command.add("--config " + conf);
-
-        Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
-        return execResult;
+    @Override
+    protected String getConnectorType() {
+        return "flink-sql";
     }
 
-    protected void copySeaTunnelFlinkFile() {
-        // copy lib
-        String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
-            FLINK_JAR_PATH);
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
-        jobManager.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN).toString());
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors/seatunnel-connectors-flink-sql";
     }
 
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "flink-sql-connector-";
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
index 05c5eb09c..b9f57920c 100644
--- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java
@@ -24,14 +24,13 @@ import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
 
 public class DatagenToConsoleIT extends FlinkContainer {
 
     @Test
-    public void testDatagenToConsole() throws IOException, URISyntaxException, InterruptedException {
+    public void testDatagenToConsole() throws IOException, InterruptedException {
         final String configFile = "/fake/flink.sql.conf";
-        Container.ExecResult execResult = executeSeaTunnelFlinkSqlJob(configFile);
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob(configFile);
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
index cbc8495c5..fcb3569ed 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml
@@ -25,9 +25,14 @@
 
     <artifactId>connector-spark-e2e-base</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index 2f74483f0..7ad7c69f5 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -17,145 +17,36 @@
 
 package org.apache.seatunnel.e2e.spark;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractSparkContainer;
 
 /**
  * This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master.
  * You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job.
  */
-public abstract class SparkContainer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class);
-
-    private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3";
-    public static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> master;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh";
-    private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar";
-    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
-    private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
-    private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
-    private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
-    private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
-
-    @BeforeEach
-    public void before() {
-        master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
-            .withNetwork(NETWORK)
-            .withNetworkAliases("spark-master")
-            .withExposedPorts()
-            .withEnv("SPARK_MODE", "master")
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
-        // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
-        // start a worker.
-
-        Startables.deepStart(Stream.of(master)).join();
-        copySeaTunnelSparkFile();
-        LOG.info("Spark container started");
-    }
-
-    @AfterEach
-    public void close() {
-        if (master != null) {
-            master.stop();
-        }
-    }
-
-    public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
-        final String confPath = getResource(confFile);
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
-        // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
-        final List<String> command = new ArrayList<>();
-        String sparkBinPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_SPARK_BIN).toString();
-        command.add(adaptPathForWin(sparkBinPath));
-        command.add("--master");
-        command.add("local");
-        command.add("--deploy-mode");
-        command.add("client");
-        command.add("--config " + adaptPathForWin(targetConfInContainer));
-
-        Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
-        return execResult;
-    }
-
-    protected void copySeaTunnelSparkFile() {
-        // copy lib
-        String seatunnelCoreSparkJarPath = Paths.get(PROJECT_ROOT_PATH.toString(),
-            "seatunnel-core", "seatunnel-spark-starter", "target", SEATUNNEL_SPARK_JAR).toString();
-        master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
-
-        // copy bin
-        String seatunnelSparkBinPath = Paths.get(PROJECT_ROOT_PATH.toString(),
-            "seatunnel-core", "seatunnel-spark-starter", "src", "main", "bin", SEATUNNEL_SPARK_BIN).toString();
-        master.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelSparkBinPath),
-            Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
-
-        // copy connectors
-        getConnectorJarFiles()
-            .forEach(jar ->
-                master.copyFileToContainer(
-                    MountableFile.forHostPath(jar.getAbsolutePath()),
-                    getConnectorPath(jar.getName())));
+public abstract class SparkContainer extends AbstractSparkContainer {
 
-        // copy plugin-mapping.properties
-        master.copyFileToContainer(
-            MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
-            Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+    @Override
+    protected String getStartModulePath() {
+        return "seatunnel-spark-starter";
     }
 
-    private String getResource(String confFile) {
-        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-spark-new-connector.sh";
     }
 
-    private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH, "seatunnel", fileName).toString();
+    @Override
+    protected String getConnectorType() {
+        return "seatunnel";
     }
 
-    private List<File> getConnectorJarFiles() {
-        File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors-v2-dist/target/lib");
-        return Arrays.stream(
-                Objects.requireNonNull(
-                    jars.listFiles(
-                        f -> f.getName().contains("connector-"))))
-            .collect(Collectors.toList());
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors-v2";
     }
 
-    private String adaptPathForWin(String path) {
-        return path == null ? "" : path.replaceAll("\\\\", "/");
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "connector-";
     }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index a7a084bac..b5aef0ec7 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -39,7 +39,7 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-spark</artifactId>
+            <artifactId>seatunnel-spark-starter</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
index 7fa7dfaa0..2ff76f7db 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml
@@ -25,9 +25,14 @@
 
     <artifactId>seatunnel-connector-spark-e2e-base</artifactId>
 
-    <properties>
-        <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index 6dca627ac..8811c06a4 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -17,141 +17,36 @@
 
 package org.apache.seatunnel.e2e.spark;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.seatunnel.e2e.common.AbstractSparkContainer;
 
 /**
  * This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master.
  * You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job.
  */
-public abstract class SparkContainer {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class);
-
-    private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3";
-    public static final Network NETWORK = Network.newNetwork();
-
-    protected GenericContainer<?> master;
-    private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
-    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark.sh";
-    private static final String SEATUNNEL_SPARK_JAR = "seatunnel-core-spark.jar";
-    private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
-    private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
-    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
-    private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
-    private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
-
-    private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
-
-    @BeforeEach
-    public void before() {
-        master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
-            .withNetwork(NETWORK)
-            .withNetworkAliases("spark-master")
-            .withExposedPorts()
-            .withEnv("SPARK_MODE", "master")
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
-        // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
-        // start a worker.
-
-        Startables.deepStart(Stream.of(master)).join();
-        copySeaTunnelSparkFile();
-        LOG.info("Spark container started");
-    }
+public abstract class SparkContainer extends AbstractSparkContainer {
 
-    @AfterEach
-    public void close() {
-        if (master != null) {
-            master.stop();
-        }
+    @Override
+    protected String getStartModulePath() {
+        return "seatunnel-core-spark";
     }
 
-    public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
-        final String confPath = getResource(confFile);
-        if (!new File(confPath).exists()) {
-            throw new IllegalArgumentException(confFile + " doesn't exist");
-        }
-        final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
-        master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
-
-        // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
-        // Running IT use cases under Windows requires replacing \ with /
-        String conf = targetConfInContainer.replaceAll("\\\\", "/");
-        final List<String> command = new ArrayList<>();
-        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-spark.sh").toString());
-        command.add("--master");
-        command.add("local");
-        command.add("--deploy-mode");
-        command.add("client");
-        command.add("--config " + conf);
-
-        Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
-        // wait job start
-        Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
-        return execResult;
-    }
-
-    protected void copySeaTunnelSparkFile() {
-        // copy lib
-        String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
-            + "/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
-        master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
-
-        // copy bin
-        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh";
-        master.copyFileToContainer(
-            MountableFile.forHostPath(seatunnelFlinkBinPath),
-            Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
-
-        // copy connectors
-        getConnectorJarFiles()
-            .forEach(jar ->
-                master.copyFileToContainer(
-                    MountableFile.forHostPath(jar.getAbsolutePath()),
-                    getConnectorPath(jar.getName())));
-
-        // copy plugin-mapping.properties
-        master.copyFileToContainer(
-            MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"),
-            Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+    @Override
+    protected String getStartShellName() {
+        return "start-seatunnel-spark.sh";
     }
 
-    private String getResource(String confFile) {
-        return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+    @Override
+    protected String getConnectorType() {
+        return "spark";
     }
 
-    private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH, "spark", fileName).toString();
+    @Override
+    protected String getConnectorModulePath() {
+        return "seatunnel-connectors/seatunnel-connectors-spark";
     }
 
-    private List<File> getConnectorJarFiles() {
-        File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors/seatunnel-connectors-spark-dist/target/lib");
-        return Arrays.stream(
-                Objects.requireNonNull(
-                    jars.listFiles(
-                        f -> f.getName().contains("seatunnel-connector-spark"))))
-            .collect(Collectors.toList());
+    @Override
+    protected String getConnectorNamePrefix() {
+        return "seatunnel-connector-spark-";
     }
 }