You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/01/16 13:41:36 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][MongoDB] Refactor MongoDB connector e2e test (#3819)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new df89459fc [Improve][Connector-V2][MongoDB] Refactor MongoDB connector e2e test (#3819)
df89459fc is described below

commit df89459fc495f0fa2128cfbae8ac47311a7692f4
Author: monster <60...@users.noreply.github.com>
AuthorDate: Mon Jan 16 21:41:30 2023 +0800

    [Improve][Connector-V2][MongoDB] Refactor MongoDB connector e2e test (#3819)
    
    * [Improve][Connector-V2][MongoDB] Refactor MongoDB connector e2e test cases
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
    
    * fix
---
 .../connector-mongodb-e2e}/pom.xml                 |  17 +-
 .../e2e/connector}/v2/mongodb/MongodbIT.java       | 140 +++++-----
 .../test/resources}/mongodb_source_and_sink.conf   |  16 +-
 .../mongodb_source_matchQuery_and_sink.conf        |  15 +-
 .../test/resources/mongodb_source_to_assert.conf}  |  29 ++-
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../seatunnel/e2e/flink/v2/mongodb/MongodbIT.java  | 229 -----------------
 .../e2e/flink/v2/mongodb/MongodbMatchQueryIT.java  | 285 ---------------------
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 -
 .../connector-mongodb-spark-e2e/pom.xml            |  49 ----
 .../test/resources/mongodb/fake_to_mongodb.conf    |  70 -----
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 -
 12 files changed, 106 insertions(+), 747 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
similarity index 85%
rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
index ac8499b09..7f4f6c6e8 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/pom.xml
@@ -17,31 +17,28 @@
         xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
+    <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-mongodb-flink-e2e</artifactId>
+    <artifactId>connector-mongodb-e2e</artifactId>
 
     <dependencies>
+
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-flink-e2e-base</artifactId>
+            <artifactId>connector-mongodb</artifactId>
             <version>${project.version}</version>
-            <classifier>tests</classifier>
-            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-
-        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-mongodb</artifactId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
similarity index 72%
rename from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
index dcabbd5d3..717be9c08 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.e2e.spark.v2.mongodb;
+package org.apache.seatunnel.e2e.connector.v2.mongodb;
 
 import static java.net.HttpURLConnection.HTTP_OK;
 import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
@@ -31,20 +31,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
 import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
 import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -57,103 +57,61 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.time.Duration;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Slf4j
-public class MongodbIT extends SparkContainer {
+public class MongodbIT extends TestSuiteBase implements TestResource {
 
     private static final String MONGODB_IMAGE = "mongo:latest";
-    private static final String MONGODB_CONTAINER_HOST = "spark_e2e_mongodb";
+    private static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
     private static final int MONGODB_PORT = 27017;
     private static final String MONGODB_DATABASE = "test_db";
     private static final String MONGODB_SOURCE_TABLE = "source_table";
-    private static final String MONGODB_SINK_TABLE = "sink_table";
 
-    private static final List<Document> TEST_DATASET = generateTestDataSet();
+    private static final List<Document> TEST_DATASET = generateTestDataSet(0, 10);
 
-    private MongoClient client;
     private GenericContainer<?> mongodbContainer;
+    private MongoClient client;
 
-    @BeforeEach
-    public void startMongoContainer() {
-        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
-        mongodbContainer = new GenericContainer<>(imageName)
-            .withNetwork(NETWORK)
-            .withNetworkAliases(MONGODB_CONTAINER_HOST)
-            .withExposedPorts(MONGODB_PORT)
-            .waitingFor(new HttpWaitStrategy()
-                .forPort(MONGODB_PORT)
-                .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
-                .withStartupTimeout(Duration.ofMinutes(2)))
-            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
-        Startables.deepStart(Stream.of(mongodbContainer)).join();
-        log.info("Mongodb container started");
+    @TestTemplate
+    public void testMongodbSourceToAssertSink(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/mongodb_source_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
 
-        Awaitility.given().ignoreExceptions()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(this::initConnection);
-        this.initSourceData();
+    @TestTemplate
+    public void testMongodb(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/mongodb_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
     }
 
-    @Test
-    public void testMongodb() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelSparkJob("/mongodb/mongodb_source_and_sink.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertIterableEquals(
-            TEST_DATASET.stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()),
-            readSinkData().stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()));
+    @TestTemplate
+    public void testMongodbMatchQuery(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/mongodb_source_matchQuery_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
     }
 
     public void initConnection() {
         String host = mongodbContainer.getContainerIpAddress();
         int port = mongodbContainer.getFirstMappedPort();
         String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
         client = MongoClients.create(url);
     }
 
-    private void initSourceData() {
+    private void initSourceData(String database, String table, List<Document> documents) {
         MongoCollection<Document> sourceTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SOURCE_TABLE);
+            .getDatabase(database)
+            .getCollection(table);
 
         sourceTable.deleteMany(new Document());
-        sourceTable.insertMany(TEST_DATASET);
-    }
-
-    private List<Document> readSinkData() {
-        MongoCollection<Document> sinkTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SINK_TABLE);
-        MongoCursor<Document> cursor = sinkTable.find()
-            .sort(Sorts.ascending("id"))
-            .cursor();
-        List<Document> documents = new ArrayList<>();
-        while (cursor.hasNext()) {
-            documents.add(cursor.next());
-        }
-        return documents;
+        sourceTable.insertMany(documents);
     }
 
-    private static List<Document> generateTestDataSet() {
+    private static List<Document> generateTestDataSet(int start, int end) {
         SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
             new String[]{
                 "id",
@@ -169,8 +127,7 @@ public class MongodbIT extends SparkContainer {
                 "c_double",
                 "c_decimal",
                 "c_bytes",
-                "c_date",
-                "c_timestamp"
+                "c_date"
             },
             new SeaTunnelDataType[]{
                 BasicType.LONG_TYPE,
@@ -186,14 +143,13 @@ public class MongodbIT extends SparkContainer {
                 BasicType.DOUBLE_TYPE,
                 new DecimalType(2, 1),
                 PrimitiveByteArrayType.INSTANCE,
-                LocalTimeType.LOCAL_DATE_TYPE,
-                LocalTimeType.LOCAL_DATE_TIME_TYPE
+                LocalTimeType.LOCAL_DATE_TYPE
             }
         );
         Serializer serializer = new DefaultSerializer(seatunnelRowType);
 
         List<Document> documents = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
+        for (int i = start; i < end; i++) {
             SeaTunnelRow row = new SeaTunnelRow(
                 new Object[]{
                     Long.valueOf(i),
@@ -209,16 +165,40 @@ public class MongodbIT extends SparkContainer {
                     Double.parseDouble("1.1"),
                     BigDecimal.valueOf(11, 1),
                     "test".getBytes(),
-                    LocalDate.now(),
-                    LocalDateTime.now()
+                    LocalDate.now()
                 });
             documents.add(serializer.serialize(row));
         }
         return documents;
     }
 
-    @AfterEach
-    public void closeMongoContainer() {
+    @BeforeAll
+    @Override
+    public void startUp() {
+        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+        mongodbContainer = new GenericContainer<>(imageName)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(MONGODB_CONTAINER_HOST)
+            .withExposedPorts(MONGODB_PORT)
+            .waitingFor(new HttpWaitStrategy()
+                .forPort(MONGODB_PORT)
+                .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
+                .withStartupTimeout(Duration.ofMinutes(2)))
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
+        Startables.deepStart(Stream.of(mongodbContainer)).join();
+        log.info("Mongodb container started");
+
+        Awaitility.given().ignoreExceptions()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+        this.initSourceData(MONGODB_DATABASE, MONGODB_SOURCE_TABLE, TEST_DATASET);
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
         if (client != null) {
             client.close();
         }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf
similarity index 77%
rename from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf
index 3411a1fd8..d3c60494c 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_and_sink.conf
@@ -16,20 +16,25 @@
 #
 
 ######
-###### This config file is a demonstration of streaming processing in seatunnel config
+###### This config file is a demonstration of batch processing in seatunnel config
 ######
 
 env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
   MongoDB {
-    uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
     database = "test_db"
     collection = "source_table"
     schema = {
@@ -48,7 +53,6 @@ source {
         c_decimal = "decimal(2, 1)"
         c_bytes = bytes
         c_date = date
-        c_timestamp = timestamp
       }
     }
   }
@@ -59,7 +63,7 @@ transform {
 
 sink {
   MongoDB {
-    uri = "mongodb://spark_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
     database = "test_db"
     collection = "sink_table"
   }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf
similarity index 77%
rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf
index 2a67c1ea6..06657ac62 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_matchQuery_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_matchQuery_and_sink.conf
@@ -16,20 +16,25 @@
 #
 
 ######
-###### This config file is a demonstration of streaming processing in seatunnel config
+###### This config file is a demonstration of batch processing in seatunnel config
 ######
 
 env {
   # You can set flink configuration here
   execution.parallelism = 1
   job.mode = "BATCH"
-  #execution.checkpoint.interval = 10000
-  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
 }
 
 source {
   MongoDB {
-    uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
     database = "test_db"
     collection = "source_matchQuery_table"
     matchQuery = "{"id":3}"
@@ -59,7 +64,7 @@ transform {
 
 sink {
   MongoDB {
-    uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
     database = "test_db"
     collection = "sink_matchQuery_table"
   }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
similarity index 75%
rename from seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
index d013995d5..ad5c1a771 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_source_and_sink.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 ######
 ###### This config file is a demonstration of streaming processing in seatunnel config
 ######
@@ -29,7 +28,7 @@ env {
 
 source {
   MongoDB {
-    uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
+    uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
     database = "test_db"
     collection = "source_table"
     schema = {
@@ -48,19 +47,27 @@ source {
         c_decimal = "decimal(2, 1)"
         c_bytes = bytes
         c_date = date
-        c_timestamp = timestamp
       }
     }
   }
 }
 
-transform {
-}
-
 sink {
-  MongoDB {
-    uri = "mongodb://flink_e2e_mongodb:27017/test_db?retryWrites=true&writeConcern=majority"
-    database = "test_db"
-    collection = "sink_table"
+  Assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 10
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 10
+          }
+        ]
+      }
   }
-}
\ No newline at end of file
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 457217735..68afc709f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -47,6 +47,7 @@
         <module>connector-iceberg-hadoop3-e2e</module>
         <module>connector-tdengine-e2e</module>
         <module>connector-datahub-e2e</module>
+        <module>connector-mongodb-e2e</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
deleted file mode 100644
index ecf8b2c21..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.mongodb;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-public class MongodbIT extends FlinkContainer {
-
-    private static final String MONGODB_IMAGE = "mongo:latest";
-    private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
-    private static final int MONGODB_PORT = 27017;
-    private static final String MONGODB_DATABASE = "test_db";
-    private static final String MONGODB_SOURCE_TABLE = "source_table";
-    private static final String MONGODB_SINK_TABLE = "sink_table";
-
-    private static final List<Document> TEST_DATASET = generateTestDataSet();
-
-    private GenericContainer<?> mongodbContainer;
-    private MongoClient client;
-
-    @BeforeEach
-    public void startMongoContainer() {
-        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
-        mongodbContainer = new GenericContainer<>(imageName)
-            .withNetwork(NETWORK)
-            .withNetworkAliases(MONGODB_CONTAINER_HOST)
-            .withExposedPorts(MONGODB_PORT)
-            .waitingFor(new HttpWaitStrategy()
-                .forPort(MONGODB_PORT)
-                .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
-                .withStartupTimeout(Duration.ofMinutes(2)))
-            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
-        Startables.deepStart(Stream.of(mongodbContainer)).join();
-        log.info("Mongodb container started");
-
-        Awaitility.given().ignoreExceptions()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(this::initConnection);
-        this.initSourceData();
-    }
-
-    @Test
-    public void testMongodb() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_and_sink.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertIterableEquals(
-            TEST_DATASET.stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()),
-            readSinkData().stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()));
-    }
-
-    public void initConnection() {
-        String host = mongodbContainer.getContainerIpAddress();
-        int port = mongodbContainer.getFirstMappedPort();
-        String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
-        client = MongoClients.create(url);
-    }
-
-    private void initSourceData() {
-        MongoCollection<Document> sourceTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SOURCE_TABLE);
-
-        sourceTable.deleteMany(new Document());
-        sourceTable.insertMany(TEST_DATASET);
-    }
-
-    private List<Document> readSinkData() {
-        MongoCollection<Document> sinkTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SINK_TABLE);
-        MongoCursor<Document> cursor = sinkTable.find()
-            .sort(Sorts.ascending("id"))
-            .cursor();
-        List<Document> documents = new ArrayList<>();
-        while (cursor.hasNext()) {
-            documents.add(cursor.next());
-        }
-        return documents;
-    }
-
-    private static List<Document> generateTestDataSet() {
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-            new String[]{
-                "id",
-                "c_map",
-                "c_array",
-                "c_string",
-                "c_boolean",
-                "c_tinyint",
-                "c_smallint",
-                "c_int",
-                "c_bigint",
-                "c_float",
-                "c_double",
-                "c_decimal",
-                "c_bytes",
-                "c_date",
-                "c_timestamp"
-            },
-            new SeaTunnelDataType[]{
-                BasicType.LONG_TYPE,
-                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                ArrayType.BYTE_ARRAY_TYPE,
-                BasicType.STRING_TYPE,
-                BasicType.BOOLEAN_TYPE,
-                BasicType.BYTE_TYPE,
-                BasicType.SHORT_TYPE,
-                BasicType.INT_TYPE,
-                BasicType.LONG_TYPE,
-                BasicType.FLOAT_TYPE,
-                BasicType.DOUBLE_TYPE,
-                new DecimalType(2, 1),
-                PrimitiveByteArrayType.INSTANCE,
-                LocalTimeType.LOCAL_DATE_TYPE,
-                LocalTimeType.LOCAL_DATE_TIME_TYPE
-            }
-        );
-        Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
-        List<Document> documents = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                new Object[]{
-                    Long.valueOf(i),
-                    Collections.singletonMap("key", Short.parseShort("1")),
-                    new Byte[]{Byte.parseByte("1")},
-                    "string",
-                    Boolean.FALSE,
-                    Byte.parseByte("1"),
-                    Short.parseShort("1"),
-                    Integer.parseInt("1"),
-                    Long.parseLong("1"),
-                    Float.parseFloat("1.1"),
-                    Double.parseDouble("1.1"),
-                    BigDecimal.valueOf(11, 1),
-                    "test".getBytes(),
-                    LocalDate.now(),
-                    LocalDateTime.now()
-                });
-            documents.add(serializer.serialize(row));
-        }
-        return documents;
-    }
-
-    @AfterEach
-    public void closeMongoContainer() {
-        if (client != null) {
-            client.close();
-        }
-        if (mongodbContainer != null) {
-            mongodbContainer.close();
-        }
-    }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
deleted file mode 100644
index 099efae75..000000000
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbMatchQueryIT.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.mongodb;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.DefaultSerializer;
-import org.apache.seatunnel.connectors.seatunnel.mongodb.data.Serializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoClients;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoCursor;
-import com.mongodb.client.model.Sorts;
-import lombok.extern.slf4j.Slf4j;
-import org.awaitility.Awaitility;
-import org.bson.Document;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Slf4j
-public class MongodbMatchQueryIT extends FlinkContainer {
-
-    private static final String MONGODB_IMAGE = "mongo:latest";
-    private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb";
-    private static final int MONGODB_PORT = 27017;
-    private static final String MONGODB_DATABASE = "test_db";
-    private static final String MONGODB_SOURCE_TABLE = "source_matchQuery_table";
-    private static final String MONGODB_SINK_TABLE = "sink_matchQuery_table";
-
-    private static final List<Document> TEST_DATASET = generateTestDataSet();
-    private static final List<Document> RESULT_DATASET = generateResultDataSet();
-
-    private GenericContainer<?> mongodbContainer;
-    private MongoClient client;
-
-    @BeforeEach
-    public void startMongoContainer() {
-        DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
-        mongodbContainer = new GenericContainer<>(imageName)
-            .withNetwork(NETWORK)
-            .withNetworkAliases(MONGODB_CONTAINER_HOST)
-            .withExposedPorts(MONGODB_PORT)
-            .waitingFor(new HttpWaitStrategy()
-                .forPort(MONGODB_PORT)
-                .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
-                .withStartupTimeout(Duration.ofMinutes(2)))
-            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
-        Startables.deepStart(Stream.of(mongodbContainer)).join();
-        log.info("Mongodb container started");
-
-        Awaitility.given().ignoreExceptions()
-            .atLeast(100, TimeUnit.MILLISECONDS)
-            .pollInterval(500, TimeUnit.MILLISECONDS)
-            .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(this::initConnection);
-        this.initSourceData();
-    }
-
-    @Test
-    public void testMongodb() throws IOException, InterruptedException {
-        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_source_matchQuery_and_sink.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertIterableEquals(
-            RESULT_DATASET.stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()),
-            readSinkData().stream()
-                .map(e -> {
-                    e.remove("_id");
-                    return e;
-                })
-                .collect(Collectors.toList()));
-    }
-
-    public void initConnection() {
-        String host = mongodbContainer.getContainerIpAddress();
-        int port = mongodbContainer.getFirstMappedPort();
-        String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
-
-        client = MongoClients.create(url);
-    }
-
-    private void initSourceData() {
-        MongoCollection<Document> sourceTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SOURCE_TABLE);
-
-        sourceTable.deleteMany(new Document());
-        sourceTable.insertMany(TEST_DATASET);
-    }
-
-    private List<Document> readSinkData() {
-        MongoCollection<Document> sinkTable = client
-            .getDatabase(MONGODB_DATABASE)
-            .getCollection(MONGODB_SINK_TABLE);
-        MongoCursor<Document> cursor = sinkTable.find()
-            .sort(Sorts.ascending("id"))
-            .cursor();
-        List<Document> documents = new ArrayList<>();
-        while (cursor.hasNext()) {
-            documents.add(cursor.next());
-        }
-        return documents;
-    }
-
-    private static List<Document> generateTestDataSet() {
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-            new String[]{
-                "id",
-                "c_map",
-                "c_array",
-                "c_string",
-                "c_boolean",
-                "c_tinyint",
-                "c_smallint",
-                "c_int",
-                "c_bigint",
-                "c_float",
-                "c_double",
-                "c_decimal",
-                "c_bytes",
-                "c_date"
-            },
-            new SeaTunnelDataType[]{
-                BasicType.LONG_TYPE,
-                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                ArrayType.BYTE_ARRAY_TYPE,
-                BasicType.STRING_TYPE,
-                BasicType.BOOLEAN_TYPE,
-                BasicType.BYTE_TYPE,
-                BasicType.SHORT_TYPE,
-                BasicType.INT_TYPE,
-                BasicType.LONG_TYPE,
-                BasicType.FLOAT_TYPE,
-                BasicType.DOUBLE_TYPE,
-                new DecimalType(2, 1),
-                PrimitiveByteArrayType.INSTANCE,
-                LocalTimeType.LOCAL_DATE_TYPE,
-            }
-        );
-        Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
-        List<Document> documents = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
-            SeaTunnelRow row = new SeaTunnelRow(
-                new Object[]{
-                    Long.valueOf(i),
-                    Collections.singletonMap("key", Short.parseShort("1")),
-                    new Byte[]{Byte.parseByte("1")},
-                    "string",
-                    Boolean.FALSE,
-                    Byte.parseByte("1"),
-                    Short.parseShort("1"),
-                    Integer.parseInt("1"),
-                    Long.parseLong("1"),
-                    Float.parseFloat("1.1"),
-                    Double.parseDouble("1.1"),
-                    BigDecimal.valueOf(11, 1),
-                    "test".getBytes(),
-                    LocalDate.now(),
-                });
-            documents.add(serializer.serialize(row));
-        }
-        return documents;
-    }
-
-    private static List<Document> generateResultDataSet() {
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-            new String[]{
-                "id",
-                "c_map",
-                "c_array",
-                "c_string",
-                "c_boolean",
-                "c_tinyint",
-                "c_smallint",
-                "c_int",
-                "c_bigint",
-                "c_float",
-                "c_double",
-                "c_decimal",
-                "c_bytes",
-                "c_date"
-            },
-            new SeaTunnelDataType[]{
-                BasicType.LONG_TYPE,
-                new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                ArrayType.BYTE_ARRAY_TYPE,
-                BasicType.STRING_TYPE,
-                BasicType.BOOLEAN_TYPE,
-                BasicType.BYTE_TYPE,
-                BasicType.SHORT_TYPE,
-                BasicType.INT_TYPE,
-                BasicType.LONG_TYPE,
-                BasicType.FLOAT_TYPE,
-                BasicType.DOUBLE_TYPE,
-                new DecimalType(2, 1),
-                PrimitiveByteArrayType.INSTANCE,
-                LocalTimeType.LOCAL_DATE_TYPE
-            }
-        );
-        Serializer serializer = new DefaultSerializer(seatunnelRowType);
-
-        List<Document> documents = new ArrayList<>();
-        SeaTunnelRow row = new SeaTunnelRow(
-            new Object[]{
-                Long.valueOf(3),
-                Collections.singletonMap("key", Short.parseShort("1")),
-                new Byte[]{Byte.parseByte("1")},
-                "string",
-                Boolean.FALSE,
-                Byte.parseByte("1"),
-                Short.parseShort("1"),
-                Integer.parseInt("1"),
-                Long.parseLong("1"),
-                Float.parseFloat("1.1"),
-                Double.parseDouble("1.1"),
-                BigDecimal.valueOf(11, 1),
-                "test".getBytes(),
-                LocalDate.now(),
-            });
-        documents.add(serializer.serialize(row));
-        return documents;
-    }
-
-    @AfterEach
-    public void closeMongoContainer() {
-        if (client != null) {
-            client.close();
-        }
-        if (mongodbContainer != null) {
-            mongodbContainer.close();
-        }
-    }
-}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c5e966bca..78ac1bc32 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -29,7 +29,6 @@
     <modules>
         <module>connector-flink-e2e-base</module>
         <module>connector-jdbc-flink-e2e</module>
-        <module>connector-mongodb-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
deleted file mode 100644
index f74682cd2..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
-        <version>${revision}</version>
-    </parent>
-
-    <artifactId>connector-mongodb-spark-e2e</artifactId>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-spark-e2e-base</artifactId>
-            <version>${project.version}</version>
-            <classifier>tests</classifier>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- SeaTunnel connectors -->
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-mongodb</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
deleted file mode 100644
index e025b97df..000000000
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/resources/mongodb/fake_to_mongodb.conf
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
-
-env {
-  # You can set spark configuration here
-  job.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-  job.mode = "BATCH"
-}
-
-source {
-  # This is a example source plugin **only for test and demonstrate the feature source plugin**
-  FakeSource {
-    result_table_name = "fake"
-    schema = {
-      fields {
-        c_string = string
-        c_boolean = boolean
-        c_tinyint = tinyint
-        c_smallint = smallint
-        c_int = int
-        c_bigint = bigint
-        c_float = float
-        c_double = double
-        c_decimal = "decimal(30, 8)"
-        c_bytes = bytes
-      }
-    }
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake
-}
-
-transform {
-  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
-}
-
-sink {
-  MongoDB {
-    uri = "mongodb://spark_e2e_mongodb_sink:27017/test_db?retryWrites=true&writeConcern=majority"
-    database = "test_db"
-    collection = "test_table"
-  }
-
-  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
-  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/MongoDB
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 65b432d90..db53fc8ac 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -29,7 +29,6 @@
     <modules>
         <module>connector-spark-e2e-base</module>
         <module>connector-jdbc-spark-e2e</module>
-        <module>connector-mongodb-spark-e2e</module>
     </modules>
 
     <dependencies>