You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/15 04:10:46 UTC
[incubator-seatunnel] branch dev updated: Add jdbc connector e2e test (#2321)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5fbcb811c Add jdbc connector e2e test (#2321)
5fbcb811c is described below
commit 5fbcb811c677e51969cd4e9ecb44dbdefb9c0922
Author: zhangyuge1 <49...@users.noreply.github.com>
AuthorDate: Mon Aug 15 12:10:41 2022 +0800
Add jdbc connector e2e test (#2321)
* add jdbc e2e test
---
pom.xml | 7 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 1 -
.../seatunnel-connector-spark-jdbc/pom.xml | 4 +
seatunnel-dist/release-docs/LICENSE | 2 +-
.../release-docs/licenses/LICENSE-pgjdbc.txt | 23 +++++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 13 +++
.../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java | 97 +++++++++++++++++++
.../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java | 102 ++++++++++++++++++++
.../test/resources/jdbc/fakesource_to_jdbc.conf | 61 ++++++++++++
.../test/resources/jdbc/jdbcsource_to_console.conf | 53 +++++++++++
seatunnel-e2e/seatunnel-spark-e2e/pom.xml | 11 +++
.../e2e/spark/jdbc/FakeSourceToJdbcIT.java | 96 +++++++++++++++++++
.../e2e/spark/jdbc/JdbcSourceToConsoleIT.java | 106 +++++++++++++++++++++
.../test/resources/jdbc/fakesource_to_jdbc.conf | 60 ++++++++++++
.../test/resources/jdbc/jdbcsource_to_console.conf | 53 +++++++++++
tools/dependencies/known-dependencies.txt | 4 +-
16 files changed, 688 insertions(+), 5 deletions(-)
diff --git a/pom.xml b/pom.xml
index e9e5ff448..120c5a73d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -224,6 +224,7 @@
<jsoup.version>1.14.3</jsoup.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
+ <checker.qual.version>3.10.0</checker.qual.version>
</properties>
<dependencyManagement>
@@ -349,7 +350,6 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
- <scope>test</scope>
</dependency>
<dependency>
@@ -935,6 +935,11 @@
<version>${jsoup.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.checkerframework</groupId>
+ <artifactId>checker-qual</artifactId>
+ <version>${checker.qual.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 39839eba7..1bb972ac9 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -46,7 +46,6 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
- <scope>provided</scope>
</dependency>
</dependencies>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
index 5bcc57ac4..485381fb8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-jdbc/pom.xml
@@ -56,6 +56,10 @@
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 34c6bcc20..090806a09 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -973,7 +973,6 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(MIT-License) spoiwo (com.norbitltd:spoiwo_2.11:1.8.0 - https://github.com/norbert-radyk/spoiwo/)
(The MIT License (MIT)) influxdb java bindings (org.influxdb:influxdb-java:2.22 - http://www.influxdb.org)
(The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
- (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
@@ -1045,6 +1044,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The BSD License) ASM Core (asm:asm:3.1 - http://asm.objectweb.org/asm/)
(New BSD License) Janino (org.codehaus.janino:janino:3.1.6 - http://docs.codehaus.org/display/JANINO/Home/janino)
+ (BSD 2-Clause License) pgjdbc (org.postgresql:postgresql:42.3.3 - https://jdbc.postgresql.org)
========================================================================
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
new file mode 100644
index 000000000..98dff7b6e
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-pgjdbc.txt
@@ -0,0 +1,23 @@
+Copyright (c) 1997, PostgreSQL Global Development Group
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index c494f80f8..454a41c95 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -37,6 +37,19 @@
<artifactId>testcontainers</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>1.17.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 000000000..784bd9a06
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private PostgreSQLContainer<?> psl;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
+ Thread.sleep(5000L);
+ Class.forName(psl.getDriverClassName());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sql = "select * from test";
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List<String> result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assertions.assertFalse(result.isEmpty());
+ }
+ }
+
+ @AfterEach
+ public void closeClickHouseContainer() {
+ if (psl != null) {
+ psl.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 000000000..14eba119d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends FlinkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private PostgreSQLContainer<?> psl;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
+ Thread.sleep(5000L);
+ Class.forName(psl.getDriverClassName());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ String sql = "insert into test(name) values(?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbcsource_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 000000000..9640e19c2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select name from fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Jdbc {
+ source_table_name = fake
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+ query = "insert into test(name) values(?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 000000000..6862abc04
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = "test"
+ query = "select * from test"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
index e6a579729..85a516165 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/pom.xml
@@ -41,6 +41,17 @@
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>1.17.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
new file mode 100644
index 000000000..061de4e1a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class FakeSourceToJdbcIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
+ private PostgreSQLContainer<?> psl;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
+ Thread.sleep(5000L);
+ Class.forName(psl.getDriverClassName());
+ initializeJdbcTable();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL\n" +
+ ")";
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/fakesource_to_jdbc.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ String sql = "select * from test";
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List<String> result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("name"));
+ }
+ Assertions.assertFalse(result.isEmpty());
+ }
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
new file mode 100644
index 000000000..f43007019
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.stream.Stream;
+
+public class JdbcSourceToConsoleIT extends SparkContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
+ private PostgreSQLContainer<?> psl;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
+ psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ psl.setPortBindings(Lists.newArrayList("33306:3306"));
+ Startables.deepStart(Stream.of(psl)).join();
+ LOGGER.info("PostgreSql container started");
+ Thread.sleep(5000L);
+ Class.forName(psl.getDriverClassName());
+ initializeJdbcTable();
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sql = "CREATE TABLE test (\n" +
+ " name varchar(255) NOT NULL,\n" +
+ " age int NOT NULL\n" +
+ ")";
+ statement.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private void batchInsertData() throws SQLException {
+ try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
+ String sql = "insert into test(name,age) values(?,?)";
+ connection.setAutoCommit(false);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ for (int i = 0; i < 10; i++) {
+ preparedStatement.setString(1, "Mike");
+ preparedStatement.setInt(2, 20);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void testFakeSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbcsource_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @AfterEach
+ public void closePostgreSqlContainer() {
+ if (psl != null) {
+ psl.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
new file mode 100644
index 000000000..8aae4fe31
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Fake {
+ result_table_name = "fake"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ jdbc {
+ driver = org.postgresql.Driver
+ saveMode = "update",
+ truncate = "true",
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
+ user = "test",
+ password = "test",
+ dbTable = "fake",
+ customUpdateStmt = "insert into test(name) values(?)"
+ jdbc.connect_timeout = 10000
+ jdbc.socket_timeout = 10000
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
new file mode 100644
index 000000000..d9555f20b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF",
+ table = "test"
+ result_table_name = "test_log"
+ user = "test"
+ password = "test"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ Console {}
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 3aeb12dba..4db58ce61 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -54,7 +54,6 @@ calcite-core-1.29.0.jar
calcite-druid-1.29.0.jar
calcite-linq4j-1.29.0.jar
checker-qual-3.10.0.jar
-checker-qual-3.4.0.jar
chill-java-0.9.3.jar
chill_2.11-0.9.3.jar
classmate-1.1.0.jar
@@ -731,4 +730,5 @@ zookeeper-jute-3.5.9.jar
zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
-jakarta.xml.bind-api-2.3.2.jar
\ No newline at end of file
+jakarta.xml.bind-api-2.3.2.jar
+postgresql-42.3.3.jar
\ No newline at end of file