You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/09 06:10:15 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] oracle connector (#2550)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 384ece191 [Feature][Connector-V2] oracle connector (#2550)
384ece191 is described below

commit 384ece19130faafcfdbc52d715871bdef686c739
Author: liugddx <80...@qq.com>
AuthorDate: Sun Oct 9 14:10:09 2022 +0800

    [Feature][Connector-V2] oracle connector (#2550)
    
    * [Feature][Connector-V2] support oracle connector
    
    * [Feature][Connector-V2] modify oracle connector
---
 docs/en/connector-v2/sink/Jdbc.md                  |  15 +-
 docs/en/connector-v2/source/Jdbc.md                |  15 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  12 ++
 .../internal/dialect/oracle/OracleDialect.java     |  39 +++++
 .../dialect/oracle/OracleDialectFactory.java       |  40 +++++
 .../dialect/oracle/OracleJdbcRowConverter.java     |  39 +++++
 .../internal/dialect/oracle/OracleTypeMapper.java  | 117 +++++++++++++++
 .../connector-jdbc-e2e/pom.xml                     |   5 +
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  | 146 ++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcCase.java        |  48 ++++++
 .../connectors/seatunnel/jdbc/JdbcOracledbIT.java  | 165 +++++++++++++++++++++
 .../test/resources/jdbc_oracle_source_to_sink.conf |  60 ++++++++
 12 files changed, 687 insertions(+), 14 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index ee8588957..3c583ff89 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -107,13 +107,14 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
 ## appendix
 
 there are some reference value for params above.
-| datasource | driver                                       | url                                                                   | xa_data_source_class_name                              | maven                                                                                 |
-|------------|----------------------------------------------|-----------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------|
-| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                      | com.mysql.cj.jdbc.MysqlXADataSource                    | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
-| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                             | org.postgresql.xa.PGXADataSource                       | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |                                                             |
-| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                              | dm.jdbc.driver.DmdbXADataSource                        | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF    | /                                                      | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client  |
-| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                             | com.microsoft.sqlserver.jdbc.SQLServerXADataSource     | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                 |
+| datasource | driver                                       | url                                                          | xa_data_source_class_name                          | maven                                                        |
+| ---------- | -------------------------------------------- | ------------------------------------------------------------ | -------------------------------------------------- | ------------------------------------------------------------ |
+| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | com.mysql.cj.jdbc.MysqlXADataSource                | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | org.postgresql.xa.PGXADataSource                   | https://mvnrepository.com/artifact/org.postgresql/postgresql |
+| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | dm.jdbc.driver.DmdbXADataSource                    | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
+| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
 
 ## Example
 
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 8e1ca4f93..cd5746201 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -90,13 +90,14 @@ in parallel according to the concurrency of tasks.
 
 there are some reference value for params above.
 
-| datasource | driver                   | url                                                                                    | maven                                                                                 |
-|------------|--------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
-| mysql      | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test                                                       | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
-| postgresql | org.postgresql.Driver    | jdbc:postgresql://localhost:5432/postgres                                              | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |
-| dm         | dm.jdbc.driver.DmDriver  | jdbc:dm://localhost:5236                                                               | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client  |
-| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver  | jdbc:microsoft:sqlserver://localhost:1433                         | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                 |
+| datasource | driver                                       | url                                                          | maven                                                        |
+| ---------- | -------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
+| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | https://mvnrepository.com/artifact/org.postgresql/postgresql |
+| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
+| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
 
 ## Example
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 25a945117..7e49bcce0 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -35,6 +35,7 @@
         <dm-jdbc.version>8.1.2.141</dm-jdbc.version>
         <sqlserver.version>9.2.1.jre8</sqlserver.version>
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
+        <oracle.version>12.2.0.1</oracle.version>
     </properties>
 
     <dependencyManagement>
@@ -69,6 +70,13 @@
                 <version>${sqlserver.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>com.oracle.database.jdbc</groupId>
+                <artifactId>ojdbc8</artifactId>
+                <version>${oracle.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
@@ -96,6 +104,10 @@
             <groupId>com.microsoft.sqlserver</groupId>
             <artifactId>mssql-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.oracle.database.jdbc</groupId>
+            <artifactId>ojdbc8</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
new file mode 100644
index 000000000..f56b02338
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class OracleDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Oracle";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new OracleJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new OracleTypeMapper();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
new file mode 100644
index 000000000..67bef095e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link OracleDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class OracleDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:oracle:thin:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new OracleDialect();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
new file mode 100644
index 000000000..35e3e3742
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class OracleJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "Oracle";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
new file mode 100644
index 000000000..a11a8c9c7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class OracleTypeMapper implements JdbcDialectTypeMapper {
+
+    // ============================data types=====================
+
+    private static final String ORACLE_UNKNOWN = "UNKNOWN";
+
+    // -------------------------number----------------------------
+    private static final String ORACLE_BINARY_DOUBLE = "BINARY_DOUBLE";
+    private static final String ORACLE_BINARY_FLOAT = "BINARY_FLOAT";
+    private static final String ORACLE_NUMBER = "NUMBER";
+    private static final String ORACLE_FLOAT = "FLOAT";
+    private static final String ORACLE_REAL = "REAL";
+    private static final String ORACLE_INTEGER = "INTEGER";
+
+    // -------------------------string----------------------------
+    private static final String ORACLE_CHAR = "CHAR";
+    private static final String ORACLE_VARCHAR2 = "VARCHAR2";
+    private static final String ORACLE_NCHAR = "NCHAR";
+    private static final String ORACLE_NVARCHAR2 = "NVARCHAR2";
+    private static final String ORACLE_LONG = "LONG";
+    private static final String ORACLE_ROWID = "ROWID";
+    private static final String ORACLE_CLOB = "CLOB";
+    private static final String ORACLE_NCLOB = "NCLOB";
+
+    // ------------------------------time-------------------------
+    private static final String ORACLE_DATE = "DATE";
+    private static final String ORACLE_TIMESTAMP = "TIMESTAMP";
+    private static final String ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
+
+    // ------------------------------blob-------------------------
+    private static final String ORACLE_BLOB = "BLOB";
+    private static final String ORACLE_BFILE = "BFILE";
+    private static final String ORACLE_RAW = "RAW";
+    private static final String ORACLE_LONG_RAW = "LONG RAW";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (oracleType) {
+            case ORACLE_INTEGER:
+                return BasicType.INT_TYPE;
+            case ORACLE_NUMBER:
+                if (precision < 38) {
+                    return new DecimalType(precision, scale);
+                }
+                return new DecimalType(38, 18);
+            case ORACLE_FLOAT:
+            case ORACLE_BINARY_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case ORACLE_BINARY_FLOAT:
+            case ORACLE_REAL:
+                return BasicType.FLOAT_TYPE;
+            case ORACLE_CHAR:
+            case ORACLE_NCHAR:
+            case ORACLE_NVARCHAR2:
+            case ORACLE_VARCHAR2:
+            case ORACLE_LONG:
+            case ORACLE_ROWID:
+            case ORACLE_NCLOB:
+            case ORACLE_CLOB:
+                return BasicType.STRING_TYPE;
+            case ORACLE_DATE:
+            case ORACLE_TIMESTAMP:
+            case ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case ORACLE_BLOB:
+            case ORACLE_RAW:
+            case ORACLE_LONG_RAW:
+            case ORACLE_BFILE:
+                return PrimitiveByteArrayType.INSTANCE;
+            //Doesn't support yet
+            case ORACLE_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support ORACLE type '%s' on column '%s'  yet.",
+                        oracleType, jdbcColumnName));
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
index 4baadbefd..000ce2c49 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
@@ -65,6 +65,11 @@
             <artifactId>DmJdbcDriver18</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.oracle.database.jdbc</groupId>
+            <artifactId>ojdbc8</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
new file mode 100644
index 000000000..3082881da
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource {
+
+    protected static final String HOST = "HOST";
+    protected Connection jdbcConnection;
+    protected GenericContainer<?> dbServer;
+    private JdbcCase jdbcCase;
+    private String jdbcUrl;
+
+    abstract JdbcCase getJdbcCase();
+
+    abstract void compareResult() throws SQLException, IOException;
+
+    abstract void clearSinkTable();
+
+    abstract SeaTunnelRow initTestData();
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar());
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    };
+
+    private void getContainer() throws ClassNotFoundException, SQLException {
+        jdbcCase = this.getJdbcCase();
+        dbServer = new GenericContainer<>(jdbcCase.getDockerImage())
+            .withNetwork(NETWORK)
+            .withNetworkAliases(jdbcCase.getNetworkAliases())
+            .withEnv(jdbcCase.getContainerEnv())
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        dbServer.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", jdbcCase.getPort(), jdbcCase.getPort())));
+        Startables.deepStart(Stream.of(dbServer)).join();
+        Class.forName(jdbcCase.getDriverClass());
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    protected void initializeJdbcConnection() throws SQLException {
+        jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost());
+        jdbcConnection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword());
+    }
+
+    private void batchInsertData() {
+        try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
+            connection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) {
+
+                for (int index = 0; index < jdbcCase.getSeaTunnelRow().getFields().length; index++) {
+                    preparedStatement.setObject(index + 1, jdbcCase.getSeaTunnelRow().getFields()[index]);
+                }
+                preparedStatement.execute();
+            }
+            connection.commit();
+        } catch (SQLException exception) {
+            exception.printStackTrace();
+        }
+    }
+
+    private void initializeJdbcTable() throws SQLException {
+        try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
+            Statement statement = connection.createStatement();
+            String createSource = jdbcCase.getDdlSource();
+            String createSink = jdbcCase.getDdlSink();
+            statement.execute(createSource);
+            statement.execute(createSink);
+        } catch (SQLException exception) {
+            exception.printStackTrace();
+        }
+        this.batchInsertData();
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.getContainer();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+        if (dbServer != null) {
+            dbServer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testJdbcDb(TestContainer container) throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = container.executeJob(jdbcCase.getConfigFile());
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        this.compareResult();
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
new file mode 100644
index 000000000..fd182ce3b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+@Builder
+public class JdbcCase {
+    private String dockerImage;
+    private String networkAliases;
+    private String driverClass;
+    private String host;
+    private String userName;
+    private String password;
+    private int port;
+    private String dataBase;
+    private String sourceTable;
+    private String sinkTable;
+    private String driverJar;
+    private String jdbcUrl;
+    private String ddlSource;
+    private String ddlSink;
+    private String initDataSql;
+    private String configFile;
+    private SeaTunnelRow seaTunnelRow;
+    private Map<String, String> containerEnv;
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
new file mode 100644
index 000000000..48e34c0c7
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class JdbcOracledbIT extends AbstractJdbcIT {
+
+    private static final String DOCKER_IMAGE = "gvenzl/oracle-xe:18.4.0-slim";
+    private static final String NETWORK_ALIASES = "e2e_oracleDb";
+    private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
+    private static final int PORT = 1521;
+    private static final String URL = "jdbc:oracle:thin:@" + HOST + ":%s/%s";
+    private static final String USERNAME = "test";
+    private static final String PASSWORD = "test";
+    private static final String DATABASE = "xepdb1";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private static final String DRIVER_JAR = "https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/12.2.0.1/ojdbc8-12.2.0.1.jar";
+    private static final String CONFIG_FILE = "/jdbc_oracle_source_to_sink.conf";
+    private static final String DDL_SOURCE = "create table " + SOURCE_TABLE + " (\n" +
+        "  varchar_10_col   varchar2(10),\n" +
+        "  char_10_col      char(10),\n" +
+        "  clob_col         clob,\n" +
+        "  number_3_sf_2_dp  number(3, 2),\n" +
+        "  integer_col       integer,\n" +
+        "  float_col         float(10),\n" +
+        "  real_col          real,\n" +
+        "  binary_float_col  binary_float,\n" +
+        "  binary_double_col binary_double,\n" +
+        "  date_col                      date,\n" +
+        "  timestamp_with_3_frac_sec_col timestamp(3),\n" +
+        "  timestamp_with_local_tz       timestamp with local time zone,\n" +
+        "  raw_col  raw(1000),\n" +
+        "  blob_col blob\n" +
+        ")";
+    private static final String DDL_SINK = "create table " + SINK_TABLE + "(\n" +
+        "  varchar_10_col   varchar2(10),\n" +
+        "  char_10_col      char(10),\n" +
+        "  clob_col         clob,\n" +
+        "  number_3_sf_2_dp  number(3, 2),\n" +
+        "  integer_col       integer,\n" +
+        "  float_col         float(10),\n" +
+        "  real_col          real,\n" +
+        "  binary_float_col  binary_float,\n" +
+        "  binary_double_col binary_double,\n" +
+        "  date_col                      date,\n" +
+        "  timestamp_with_3_frac_sec_col timestamp(3),\n" +
+        "  timestamp_with_local_tz       timestamp with local time zone,\n" +
+        "  raw_col  raw(1000),\n" +
+        "  blob_col blob\n" +
+        ")";
+    private static final String INIT_DATA_SQL = "insert into " + SOURCE_TABLE + " (\n" +
+        "  varchar_10_col,\n" +
+        "  char_10_col,\n" +
+        "  clob_col,\n" +
+        "  number_3_sf_2_dp,\n" +
+        "  integer_col,\n" +
+        "  float_col,\n" +
+        "  real_col,\n" +
+        "  binary_float_col,\n" +
+        "  binary_double_col,\n" +
+        "  date_col,\n" +
+        "  timestamp_with_3_frac_sec_col,\n" +
+        "  timestamp_with_local_tz,\n" +
+        "  raw_col,\n" +
+        "  blob_col\n" +
+        ")values(\n" +
+        "\t?,?,?,?,?,?,?,?,?,?,?,?,?,rawtohex(?),rawtohex(?)\n" +
+        ")";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        containerEnv.put("ORACLE_PASSWORD", PASSWORD);
+        containerEnv.put("APP_USER", USERNAME);
+        containerEnv.put("APP_USER_PASSWORD", PASSWORD);
+        String jdbcUrl = String.format(URL, PORT, DATABASE);
+        return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
+            .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+            .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(DRIVER_JAR)
+            .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+    }
+
+    @Override
+    void compareResult() throws SQLException, IOException {
+        String sourceSql = "select * from " + SOURCE_TABLE;
+        String sinkSql = "select * from " + SINK_TABLE;
+        List<String> columns = Lists.newArrayList("varchar_10_col", "char_10_col", "clob_col", "number_3_sf_2_dp", "integer_col", "float_col", "real_col", "binary_float_col", "binary_double_col", "date_col", "timestamp_with_3_frac_sec_col", "timestamp_with_local_tz", "raw_col", "blob_col");
+        Statement sourceStatement = jdbcConnection.createStatement();
+        Statement sinkStatement = jdbcConnection.createStatement();
+        ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+        ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+        while (sourceResultSet.next()) {
+            if (sinkResultSet.next()) {
+                for (String column : columns) {
+                    Object source = sourceResultSet.getObject(column);
+                    Object sink = sinkResultSet.getObject(column);
+                    if (!Objects.deepEquals(source, sink)) {
+
+                        InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
+                        InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
+                        String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                        String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+                        Assertions.assertEquals(sourceValue, sinkValue);
+                    }
+                    Assertions.assertTrue(true);
+                }
+            }
+        }
+        clearSinkTable();
+    }
+
+    @Override
+    void clearSinkTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE));
+        } catch (SQLException e) {
+            throw new RuntimeException("test oracle server image error", e);
+        }
+    }
+
+    @Override
+    SeaTunnelRow initTestData() {
+        return new SeaTunnelRow(
+            new Object[]{"varchar", "char10col1", "clobS", 1.12, 2022, 1.2222, 1.22222, 1.22222, 1.22222,
+                LocalDate.now(),
+                LocalDateTime.now(),
+                LocalDateTime.now(),
+                "raw", "blob"
+            });
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
new file mode 100644
index 000000000..11cc07e90
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/xepdb1"
+    user = test
+    password = test
+    query = "select varchar_10_col,char_10_col,clob_col,number_3_sf_2_dp,integer_col,float_col,real_col,binary_float_col,binary_double_col,date_col,timestamp_with_3_frac_sec_col,timestamp_with_local_tz,raw_col,blob_col from e2e_table_source"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@e2e_oracleDb:1521/xepdb1"
+    user = test
+    password = test
+    query = "INSERT INTO e2e_table_sink (varchar_10_col,char_10_col,clob_col,number_3_sf_2_dp,integer_col,float_col,real_col,binary_float_col,binary_double_col,date_col,timestamp_with_3_frac_sec_col,timestamp_with_local_tz,raw_col,blob_col) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}