You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/30 09:38:33 UTC

[incubator-seatunnel] branch api-draft updated: Add e2e for new connector (#1973)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new a9d54fdd Add e2e for new connector (#1973)
a9d54fdd is described below

commit a9d54fdd0a46ae788412d895f62f6d93aa5b7ec5
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon May 30 17:38:26 2022 +0800

    Add e2e for new connector (#1973)
    
    * Add E2E to new Connector
    * Use seatunnel script to submit job in e2e (#1937)
    * Change ci timeout to 60mins
    * fix cannot set bounded stream
---
 .github/workflows/backend.yml                      |  2 +-
 .../api/source/SeaTunnelRuntimeEnvironment.java    |  6 +-
 .../seatunnel/console/sink/ConsoleSink.java        | 12 ++++
 .../seatunnel/fake/source/FakeSource.java          | 12 ++++
 .../seatunnel/fake/source/FakeSourceReader.java    |  1 +
 .../connectors/seatunnel/kafka/sink/KafkaSink.java | 12 ++++
 .../seatunnel/kafka/source/KafkaSource.java        | 12 ++++
 .../starter/flink/execution/FlinkExecution.java    |  2 +-
 .../flink/execution/SinkExecuteProcessor.java      |  2 +
 .../flink/execution/SourceExecuteProcessor.java    |  2 +
 .../spark/execution/SinkExecuteProcessor.java      |  2 +
 .../spark/execution/SourceExecuteProcessor.java    |  2 +
 seatunnel-e2e/pom.xml                              |  4 +-
 seatunnel-e2e/seatunnel-flink-e2e/pom.xml          |  7 --
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 28 +++++---
 .../pom.xml                                        | 13 ++--
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 79 ++++++++++------------
 .../e2e/flink/fake/FakeSourceToConsoleIT.java      | 27 ++++----
 .../test/resources/fake/fakesource_to_console.conf | 54 +++++++++++++++
 .../src/test/resources/log4j.properties            | 23 +++++++
 seatunnel-e2e/seatunnel-spark-e2e/pom.xml          |  6 --
 .../apache/seatunnel/e2e/spark/SparkContainer.java | 35 ++++------
 .../pom.xml                                        | 12 +---
 .../apache/seatunnel/e2e/spark/SparkContainer.java | 48 ++++++-------
 .../e2e/spark/fake/FakeSourceToConsoleIT.java      | 27 +++++---
 .../test/resources/fake/fakesource_to_console.conf | 73 ++++++++++++++++++++
 .../src/test/resources/log4j.properties            | 22 ++++++
 27 files changed, 360 insertions(+), 165 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 7e196a90..5e6dfdff 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -35,7 +35,7 @@ jobs:
     strategy:
       matrix:
         java: ['8', '11']
-    timeout-minutes: 50
+    timeout-minutes: 60
     env:
       MAVEN_OPTS: -Xmx2G -Xms2G
     steps:
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
index 11f69e5a..16630636 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
@@ -29,7 +29,7 @@ public interface SeaTunnelRuntimeEnvironment {
      *
      * @return seaTunnelContext
      */
-    default SeaTunnelContext getSeaTunnelContext() {
-        return SeaTunnelContext.getContext();
-    }
+    SeaTunnelContext getSeaTunnelContext();
+
+    void setSeaTunnelContext(SeaTunnelContext seaTunnelContext);
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index f850c746..6620d738 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -33,6 +34,7 @@ import java.util.List;
 public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
 
     private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
 
     @Override
@@ -60,4 +62,14 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
     public void prepare(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
     }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 3c41ff45..22a654a5 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -36,6 +37,7 @@ import com.google.auto.service.AutoService;
 public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
 
     private Config pluginConfig;
+    private SeaTunnelContext seaTunnelContext;
 
     @Override
     public SeaTunnelRowTypeInfo getRowTypeInfo() {
@@ -76,4 +78,14 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     public void prepare(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
     }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 5352235e..edec0ccb 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -71,6 +71,7 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
         }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
             // signal to the source that we have reached the end of the data.
+            LOGGER.info("Closed the bounded fake source");
             context.signalNoMoreElement();
         }
         Thread.sleep(1000L);
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 70cfacdb..8bd13dd0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -46,6 +47,7 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
 
     private Config pluginConfig;
     private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+    private SeaTunnelContext seaTunnelContext;
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
@@ -86,4 +88,14 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
     public String getPluginName() {
         return "Kafka";
     }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 1c129b08..bbba086d 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATT
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -52,6 +53,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     private final ConsumerMetadata metadata = new ConsumerMetadata();
     private SeaTunnelRowTypeInfo typeInfo;
+    private SeaTunnelContext seaTunnelContext;
 
     @Override
     public String getPluginName() {
@@ -115,4 +117,14 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     public Serializer<KafkaSourceState> getEnumeratorStateSerializer() {
         return new DefaultSerializer<>();
     }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 8d433c27..c24aae13 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -49,7 +49,7 @@ public class FlinkExecution implements TaskExecution {
 
     public FlinkExecution(Config config) {
         this.config = config;
-        this.flinkEnvironment = (FlinkEnvironment) new EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
+        this.flinkEnvironment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
         SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
         this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
         this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index d207b7eb..89f599b2 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -63,6 +64,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
                 sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
+            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
         }).collect(Collectors.toList());
         flinkEnvironment.registerPlugin(pluginJars);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 689b03f0..5f0aa663 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -68,6 +69,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
+            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             sources.add(new SeaTunnelParallelSource(seaTunnelSource));
         }
         flinkEnvironment.registerPlugin(jars);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index d9016a6e..3dfe9268 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
@@ -55,6 +56,7 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
             seaTunnelSink.prepare(sinkConfig);
+            seaTunnelSink.setSeaTunnelContext(SeaTunnelContext.getContext());
             return seaTunnelSink;
         }).collect(Collectors.toList());
         sparkEnvironment.registerPlugin(pluginJars);
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index dd7cec03..b483d02c 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -70,6 +71,7 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             jars.addAll(sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
             SeaTunnelSource<?, ?, ?> seaTunnelSource = sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
             seaTunnelSource.prepare(sourceConfig);
+            seaTunnelSource.setSeaTunnelContext(SeaTunnelContext.getContext());
             sources.add(seaTunnelSource);
         }
         sparkEnvironment.registerPlugin(jars);
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index 5c37f80a..69461907 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -17,8 +17,8 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
@@ -29,6 +29,8 @@
     <modules>
         <module>seatunnel-flink-e2e</module>
         <module>seatunnel-spark-e2e</module>
+        <module>seatunnel-flink-new-connector-e2e</module>
+        <module>seatunnel-spark-new-connector-e2e</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
index 5f44ffb7..ff71391f 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -33,13 +33,6 @@
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index ffa1c991..50bd498d 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -53,11 +53,13 @@ public abstract class FlinkContainer {
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink.sh";
     private static final String SEATUNNEL_FLINK_JAR = "seatunnel-core-flink.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_JAR).toString();
-    private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
+    private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
 
     private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
 
@@ -113,12 +115,9 @@ public abstract class FlinkContainer {
         jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
 
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("flink");
-        command.add("run");
-        command.add("-c org.apache.seatunnel.core.flink.SeatunnelFlink " + jar);
+        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-flink.sh").toString());
         command.add("--config " + conf);
 
         Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
@@ -130,11 +129,20 @@ public abstract class FlinkContainer {
     }
 
     protected void copySeaTunnelFlinkFile() {
+        // copy lib
         String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
             + "/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
-        jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath), FLINK_JAR_PATH);
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+            Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
+
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh";
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
 
-        // copy connectors jar
+        // copy connectors
         File jars = new File(PROJECT_ROOT_PATH +
             "/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
         Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-flink"))))
@@ -146,7 +154,7 @@ public abstract class FlinkContainer {
         // copy plugin-mapping.properties
         jobManager.copyFileToContainer(
             MountableFile.forHostPath(PROJECT_ROOT_PATH + "/seatunnel-connectors/plugin-mapping.properties"),
-            Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+            Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
     }
 
     private String getResource(String confFile) {
@@ -154,7 +162,7 @@ public abstract class FlinkContainer {
     }
 
     private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH.toString(), "flink", fileName).toString();
+        return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString();
     }
 
 }
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/pom.xml
similarity index 81%
copy from seatunnel-e2e/seatunnel-spark-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-flink-new-connector-e2e/pom.xml
index 66da0231..5c59d849 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/pom.xml
@@ -23,25 +23,20 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-spark-e2e</artifactId>
-    <packaging>jar</packaging>
+    <artifactId>seatunnel-flink-new-connector-e2e</artifactId>
 
     <dependencies>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-spark</artifactId>
+            <artifactId>seatunnel-core-flink</artifactId>
             <version>${project.version}</version>
         </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
         </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
similarity index 65%
copy from seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
copy to seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index ffa1c991..1a277be5 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -1,20 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.seatunnel.e2e.flink;
 
 import org.junit.After;
@@ -39,7 +22,7 @@ import java.util.Objects;
 import java.util.stream.Stream;
 
 /**
- * This class is the base class of FlinkEnvironment test.
+ * This class is the base class of FlinkEnvironment test for new seatunnel connector API.
  * The before method will create a Flink cluster, and after method will close the Flink cluster.
  * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job.
  */
@@ -53,11 +36,13 @@ public abstract class FlinkContainer {
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
-    private static final String SEATUNNEL_FLINK_JAR = "seatunnel-core-flink.jar";
+    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh";
+    private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_JAR).toString();
-    private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
+    private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
 
     private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
 
@@ -72,22 +57,22 @@ public abstract class FlinkContainer {
     @Before
     public void before() {
         jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                .withCommand("jobmanager")
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(FLINK_DOCKER_IMAGE)
+                .withCommand("taskmanager")
                 .withNetwork(NETWORK)
-                .withNetworkAliases("jobmanager")
-                .withExposedPorts()
+                .withNetworkAliases("taskmanager")
                 .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                .dependsOn(jobManager)
                 .withLogConsumer(new Slf4jLogConsumer(LOG));
 
-        taskManager =
-                new GenericContainer<>(FLINK_DOCKER_IMAGE)
-                        .withCommand("taskmanager")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases("taskmanager")
-                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
-                        .dependsOn(jobManager)
-                        .withLogConsumer(new Slf4jLogConsumer(LOG));
-
         Startables.deepStart(Stream.of(jobManager)).join();
         Startables.deepStart(Stream.of(taskManager)).join();
         copySeaTunnelFlinkFile();
@@ -113,12 +98,9 @@ public abstract class FlinkContainer {
         jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
 
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("flink");
-        command.add("run");
-        command.add("-c org.apache.seatunnel.core.flink.SeatunnelFlink " + jar);
+        command.add(Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString());
         command.add("--config " + conf);
 
         Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
@@ -130,14 +112,23 @@ public abstract class FlinkContainer {
     }
 
     protected void copySeaTunnelFlinkFile() {
+        // copy lib
         String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
-            + "/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
-        jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath), FLINK_JAR_PATH);
+            + "/seatunnel-core/seatunnel-flink-starter/target/" + SEATUNNEL_FLINK_JAR;
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+            Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
+
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-flink-starter/src/main/bin/" + SEATUNNEL_FLINK_BIN;
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
 
-        // copy connectors jar
+        // copy connectors
         File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
-        Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-flink"))))
+            "/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/target/lib");
+        Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-seatunnel"))))
             .forEach(jar ->
                 jobManager.copyFileToContainer(
                     MountableFile.forHostPath(jar.getAbsolutePath()),
@@ -146,7 +137,7 @@ public abstract class FlinkContainer {
         // copy plugin-mapping.properties
         jobManager.copyFileToContainer(
             MountableFile.forHostPath(PROJECT_ROOT_PATH + "/seatunnel-connectors/plugin-mapping.properties"),
-            Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+            Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
     }
 
     private String getResource(String confFile) {
@@ -154,7 +145,7 @@ public abstract class FlinkContainer {
     }
 
     private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH.toString(), "flink", fileName).toString();
+        return Paths.get(SEATUNNEL_CONNECTORS, "seatunnel", fileName).toString();
     }
 
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
similarity index 58%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
copy to seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
index 11f69e5a..2663eb80 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
@@ -15,21 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.source;
+package org.apache.seatunnel.e2e.flink.fake;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
-/**
- * This interface defines the runtime environment of the SeaTunnel application.
- */
-public interface SeaTunnelRuntimeEnvironment {
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class FakeSourceToConsoleIT extends FlinkContainer {
 
-    /**
-     * Returns the SeaTunnel runtime context.
-     *
-     * @return seaTunnelContext
-     */
-    default SeaTunnelContext getSeaTunnelContext() {
-        return SeaTunnelContext.getContext();
+    @Test
+    @SuppressWarnings("magicnumber")
+    public void testFakeSourceToConsoleSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/fake/fakesource_to_console.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
     }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf
new file mode 100644
index 00000000..49be0920
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name,age"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name,age from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Console {}
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000..57b61a3c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-new-connector-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=ERROR, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index 66da0231..31b140df 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -27,12 +27,6 @@
     <packaging>jar</packaging>
 
     <dependencies>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-spark</artifactId>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index b3dd5d16..552522d4 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,16 +52,18 @@ public abstract class SparkContainer {
 
     protected GenericContainer<?> master;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark.sh";
     private static final String SEATUNNEL_SPARK_JAR = "seatunnel-core-spark.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
     private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
     private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
 
     private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() {
         master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases("spark-master")
@@ -93,31 +95,14 @@ public abstract class SparkContainer {
 
         // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = SPARK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("spark-submit");
-        command.add("--class");
-        command.add("org.apache.seatunnel.core.spark.SeatunnelSpark");
-        command.add("--name");
-        command.add("SeaTunnel");
-        command.add("--master");
-        command.add("local");
-        command.add("--jars");
-        command.add(
-            getConnectorJarFiles()
-                .stream()
-                .map(j -> getConnectorPath(j.getName()))
-                .collect(Collectors.joining(",")));
-        command.add("--deploy-mode");
-        command.add("client");
-        command.add(jar);
-        command.add("-c");
-        command.add(conf);
+        command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-spark.sh").toString());
         command.add("--master");
         command.add("local");
         command.add("--deploy-mode");
         command.add("client");
+        command.add("--config " + conf);
 
         Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
         LOG.info(execResult.getStdout());
@@ -128,12 +113,18 @@ public abstract class SparkContainer {
     }
 
     protected void copySeaTunnelSparkFile() {
-        // copy jar to container
+        // copy lib
         String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
             + "/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
         master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
 
-        // copy connectors jar
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh";
+        master.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
+
+        // copy connectors
         getConnectorJarFiles()
             .forEach(jar ->
                 master.copyFileToContainer(
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/pom.xml
similarity index 86%
copy from seatunnel-e2e/seatunnel-spark-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-spark-new-connector-e2e/pom.xml
index 66da0231..925075cb 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/pom.xml
@@ -17,22 +17,16 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>seatunnel-spark-e2e</artifactId>
     <packaging>jar</packaging>
 
+    <artifactId>seatunnel-spark-new-connector-e2e</artifactId>
+
     <dependencies>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.25</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-spark</artifactId>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
similarity index 83%
copy from seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
copy to seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index b3dd5d16..ec3b300b 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,16 +52,18 @@ public abstract class SparkContainer {
 
     protected GenericContainer<?> master;
     private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
-    private static final String SEATUNNEL_SPARK_JAR = "seatunnel-core-spark.jar";
+    private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh";
+    private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar";
     private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
     private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
     private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();
 
     private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() {
         master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases("spark-master")
@@ -93,31 +95,14 @@ public abstract class SparkContainer {
 
         // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar.
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = SPARK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("spark-submit");
-        command.add("--class");
-        command.add("org.apache.seatunnel.core.spark.SeatunnelSpark");
-        command.add("--name");
-        command.add("SeaTunnel");
-        command.add("--master");
-        command.add("local");
-        command.add("--jars");
-        command.add(
-            getConnectorJarFiles()
-                .stream()
-                .map(j -> getConnectorPath(j.getName()))
-                .collect(Collectors.joining(",")));
-        command.add("--deploy-mode");
-        command.add("client");
-        command.add(jar);
-        command.add("-c");
-        command.add(conf);
+        command.add(Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_SPARK_BIN).toString());
         command.add("--master");
         command.add("local");
         command.add("--deploy-mode");
         command.add("client");
+        command.add("--config " + conf);
 
         Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
         LOG.info(execResult.getStdout());
@@ -128,12 +113,19 @@ public abstract class SparkContainer {
     }
 
     protected void copySeaTunnelSparkFile() {
-        // copy jar to container
-        String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
-            + "/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
+        // copy lib
+        String seatunnelCoreSparkJarPath = Paths.get(PROJECT_ROOT_PATH.toString(),
+            "seatunnel-core", "seatunnel-spark-starter", "target", SEATUNNEL_SPARK_JAR).toString();
         master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH);
 
-        // copy connectors jar
+        // copy bin
+        String seatunnelFlinkBinPath = Paths.get(PROJECT_ROOT_PATH.toString(),
+            "seatunnel-core", "seatunnel-spark-starter", "src", "main", "bin", SEATUNNEL_SPARK_BIN).toString();
+        master.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
+
+        // copy connectors
         getConnectorJarFiles()
             .forEach(jar ->
                 master.copyFileToContainer(
@@ -151,16 +143,16 @@ public abstract class SparkContainer {
     }
 
     private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH, "spark", fileName).toString();
+        return Paths.get(CONNECTORS_PATH, "seatunnel", fileName).toString();
     }
 
     private List<File> getConnectorJarFiles() {
         File jars = new File(PROJECT_ROOT_PATH +
-            "/seatunnel-connectors/seatunnel-connectors-spark-dist/target/lib");
+            "/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/target/lib");
         return Arrays.stream(
                 Objects.requireNonNull(
                     jars.listFiles(
-                        f -> f.getName().contains("seatunnel-connector-spark"))))
+                        f -> f.getName().contains("seatunnel-connector-seatunnel"))))
             .collect(Collectors.toList());
     }
 }
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/fake/FakeSourceToConsoleIT.java
similarity index 52%
copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
copy to seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/fake/FakeSourceToConsoleIT.java
index 11f69e5a..2f9f1d1d 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelRuntimeEnvironment.java
+++ b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/java/org/apache/seatunnel/e2e/spark/fake/FakeSourceToConsoleIT.java
@@ -15,21 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.source;
+package org.apache.seatunnel.e2e.spark.fake;
 
-import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
 
 /**
- * This interface defines the runtime environment of the SeaTunnel application.
+ * This test case is used to verify that the fake source is able to send data to the console.
+ * Make sure the SeaTunnel job can submit successfully on spark engine.
  */
-public interface SeaTunnelRuntimeEnvironment {
+public class FakeSourceToConsoleIT extends SparkContainer {
 
-    /**
-     * Returns the SeaTunnel runtime context.
-     *
-     * @return seaTunnelContext
-     */
-    default SeaTunnelContext getSeaTunnelContext() {
-        return SeaTunnelContext.getContext();
+    @Test
+    @SuppressWarnings("magicnumber")
+    public void testFakeSourceToConsoleSine() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/fake/fakesource_to_console.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
     }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf
new file mode 100644
index 00000000..7b812762
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
+  job.mode = "BATCH"
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  # This is a example input plugin **only for test and demonstrate the feature input plugin**
+  FakeSource {
+    result_table_name = "my_dataset"
+  }
+
+  # You can also use other input plugins, such as hdfs
+  # hdfs {
+  #   result_table_name = "accesslog"
+  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog"
+  #   format = "json"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+}
+
+transform {
+  # split data by specific delimiter
+
+  # you can also use other transform plugins, such as sql
+  # sql {
+  #   sql = "select * from accesslog where request_time > 1000"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Console {}
+
+  # you can also you other output plugins, such as sql
+  # hdfs {
+  #   path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed"
+  #   save_mode = "append"
+  # }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
+  # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/log4j.properties
new file mode 100644
index 00000000..89547981
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-new-connector-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=ERROR, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n