You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/15 04:10:46 UTC

[incubator-seatunnel] branch dev updated: Add jdbc connector e2e test (#2321)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5fbcb811c Add jdbc connector e2e test (#2321)
5fbcb811c is described below

commit 5fbcb811c677e51969cd4e9ecb44dbdefb9c0922
Author: zhangyuge1 <49...@users.noreply.github.com>
AuthorDate: Mon Aug 15 12:10:41 2022 +0800

    Add jdbc connector e2e test (#2321)
    
    * add jdbc e2e test
---
 pom.xml                                            |   7 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |   1 -
 .../seatunnel-connector-spark-jdbc/pom.xml         |   4 +
 seatunnel-dist/release-docs/LICENSE                |   2 +-
 .../release-docs/licenses/LICENSE-pgjdbc.txt       |  23 +++++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |  13 +++
 .../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java      |  97 +++++++++++++++++++
 .../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java   | 102 ++++++++++++++++++++
 .../test/resources/jdbc/fakesource_to_jdbc.conf    |  61 ++++++++++++
 .../test/resources/jdbc/jdbcsource_to_console.conf |  53 +++++++++++
 seatunnel-e2e/seatunnel-spark-e2e/pom.xml          |  11 +++
 .../e2e/spark/jdbc/FakeSourceToJdbcIT.java         |  96 +++++++++++++++++++
 .../e2e/spark/jdbc/JdbcSourceToConsoleIT.java      | 106 +++++++++++++++++++++
 .../test/resources/jdbc/fakesource_to_jdbc.conf    |  60 ++++++++++++
 .../test/resources/jdbc/jdbcsource_to_console.conf |  53 +++++++++++
 tools/dependencies/known-dependencies.txt          |   4 +-
 16 files changed, 688 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index e9e5ff448..120c5a73d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
         <jsoup.version>1.14.3</jsoup.version>
         <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
         <elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
+        <checker.qual.version>3.10.0</checker.qual.version>
     </properties>
 
     <dependencyManagement>
@@ -349,7 +350,6 @@
                 <groupId>org.postgresql</groupId>
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.version}</version>
-                <scope>test</scope>
             </dependency>
 
             <dependency>
@@ -935,6 +935,11 @@
                 <version>${jsoup.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.checkerframework</groupId>
+                <artifactId>checker-qual</artifactId>
+                <version>${checker.qual.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 39839eba7..1bb972ac9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -46,7 +46,6 @@
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
-            <scope>provided</scope>
         </dependency>
 
     </dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
index 5bcc57ac4..485381fb8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
@@ -56,6 +56,10 @@
             <artifactId>spark-streaming_${scala.binary.version}</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 34c6bcc20..090806a09 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -973,7 +973,6 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (MIT-License) spoiwo (com.norbitltd:spoiwo_2.11:1.8.0 - https://github.com/norbert-radyk/spoiwo/)
      (The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org)
      (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
-     (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
      (The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
 
 
@@ -1045,6 +1044,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
      (The BSD License) ASM Core (asm:asm:3.1 - http://asm.objectweb.org/asm/)
      (New BSD License) Janino (org.codehaus.janino:janino:3.1.6 - http://docs.codehaus.org/display/JANINO/Home/janino)
+     (BSD 2-Clause License) pgjdbc (org.postgresql:postgresql:42.3.3 - https://jdbc.postgresql.org)
 
 
 ========================================================================
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
new file mode 100644
index 000000000..98dff7b6e
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
@@ -0,0 +1,23 @@
+Copyright (c) 1997, PostgreSQL Global Development Group
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+   this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright notice,
+   this list of conditions and the following disclaimer in the documentation
+   and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c494f80f8..454a41c95 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,19 @@
             <artifactId>testcontainers</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>1.17.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 000000000..784bd9a06
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends FlinkContainer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+    private PostgreSQLContainer<?> psl;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases("postgresql")
+                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(psl)).join();
+        LOGGER.info("PostgreSql container started");
+        Thread.sleep(5000L);
+        Class.forName(psl.getDriverClassName());
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "CREATE TABLE test (\n" +
+                    "  name varchar(255) NOT NULL\n" +
+                    ")";
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    @Test
+    public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // query result
+        String sql = "select * from test";
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery(sql);
+            List<String> result = Lists.newArrayList();
+            while (resultSet.next()) {
+                result.add(resultSet.getString("name"));
+            }
+            Assertions.assertFalse(result.isEmpty());
+        }
+    }
+
+    @AfterEach
+    public void closeClickHouseContainer() {
+        if (psl != null) {
+            psl.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 000000000..14eba119d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends FlinkContainer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+    private PostgreSQLContainer<?> psl;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases("postgresql")
+                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(psl)).join();
+        LOGGER.info("PostgreSql container started");
+        Thread.sleep(5000L);
+        Class.forName(psl.getDriverClassName());
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "CREATE TABLE test (\n" +
+                    "  name varchar(255) NOT NULL\n" +
+                    ")";
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() throws SQLException {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            String sql = "insert into test(name) values(?)";
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            for (int i = 0; i < 10; i++) {
+                preparedStatement.setString(1, "Mike");
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbcsource_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (psl != null) {
+            psl.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 000000000..9640e19c2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      field_name = "name"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+    sql {
+      sql = "select name from fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Jdbc {
+      source_table_name = fake
+      driver = org.postgresql.Driver
+      url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+      user = test
+      password = test
+      query = "insert into test(name) values(?)"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 000000000..6862abc04
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+        user = test
+        password = "test"
+        query = "select * from test"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Console {}
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index e6a579729..85a516165 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -41,6 +41,17 @@
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>1.17.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 000000000..061de4e1a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends SparkContainer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+    private PostgreSQLContainer<?> psl;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases("postgresql")
+                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        Startables.deepStart(Stream.of(psl)).join();
+        LOGGER.info("PostgreSql container started");
+        Thread.sleep(5000L);
+        Class.forName(psl.getDriverClassName());
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "CREATE TABLE test (\n" +
+                    "  name varchar(255) NOT NULL\n" +
+                    ")";
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    @Test
+    public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        String sql = "select * from test";
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery(sql);
+            List<String> result = Lists.newArrayList();
+            while (resultSet.next()) {
+                result.add(resultSet.getString("name"));
+            }
+            Assertions.assertFalse(result.isEmpty());
+        }
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (psl != null) {
+            psl.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 000000000..f43007019
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends SparkContainer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+    private PostgreSQLContainer<?> psl;
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases("postgresql")
+                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+        psl.setPortBindings(Lists.newArrayList("33306:3306"));
+        Startables.deepStart(Stream.of(psl)).join();
+        LOGGER.info("PostgreSql container started");
+        Thread.sleep(5000L);
+        Class.forName(psl.getDriverClassName());
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sql = "CREATE TABLE test (\n" +
+                    "  name varchar(255) NOT NULL,\n" +
+                    "  age int NOT NULL\n" +
+                    ")";
+            statement.executeUpdate(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private void batchInsertData() throws SQLException {
+        try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+            String sql = "insert into test(name,age) values(?,?)";
+            connection.setAutoCommit(false);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            for (int i = 0; i < 10; i++) {
+                preparedStatement.setString(1, "Mike");
+                preparedStatement.setInt(2, 20);
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbcsource_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @AfterEach
+    public void closePostgreSqlContainer() {
+        if (psl != null) {
+            psl.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 000000000..8aae4fe31
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    Fake {
+      result_table_name = "fake"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  jdbc {
+      driver = org.postgresql.Driver
+      saveMode = "update",
+      truncate = "true",
+      url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
+      user = "test",
+      password = "test",
+      dbTable = "fake",
+      customUpdateStmt = "insert into test(name) values(?)"
+      jdbc.connect_timeout = 10000
+      jdbc.socket_timeout = 10000
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 000000000..d9555f20b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  jdbc {
+      driver = org.postgresql.Driver
+      url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
+      table = "test"
+      result_table_name = "test_log"
+      user = "test"
+      password = "test"
+  }
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  Console {}
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 3aeb12dba..4db58ce61 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -54,7 +54,6 @@ calcite-core-1.29.0.jar
 calcite-druid-1.29.0.jar
 calcite-linq4j-1.29.0.jar
 checker-qual-3.10.0.jar
-checker-qual-3.4.0.jar
 chill-java-0.9.3.jar
 chill_2.11-0.9.3.jar
 classmate-1.1.0.jar
@@ -731,4 +730,5 @@ zookeeper-jute-3.5.9.jar
 zstd-jni-1.3.3-1.jar
 zstd-jni-1.4.3-1.jar
 jakarta.activation-api-1.2.1.jar
-jakarta.xml.bind-api-2.3.2.jar
\ No newline at end of file
+jakarta.xml.bind-api-2.3.2.jar
+postgresql-42.3.3.jar
\ No newline at end of file