You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/19 09:15:29 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][Jdbc] support gbase 8a (#3026)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc6e85d06 [Feature][Connector-V2][Jdbc] support gbase 8a  (#3026)
dc6e85d06 is described below

commit dc6e85d06f08e6b0bdcbb93d1cc5ed0e5c213385
Author: liugddx <80...@qq.com>
AuthorDate: Wed Oct 19 17:15:24 2022 +0800

    [Feature][Connector-V2][Jdbc] support gbase 8a  (#3026)
    
    * gbase 8a connector
    
    * add gbase8a e2e test
---
 docs/en/connector-v2/sink/Jdbc.md                  |   1 +
 docs/en/connector-v2/source/Jdbc.md                |   1 +
 .../internal/dialect/gbase8a/Gbase8aDialect.java   |  43 +++---
 .../dialect/gbase8a/Gbase8aDialectFactory.java     |  40 ++---
 .../dialect/gbase8a/Gbase8aJdbcRowConverter.java   |  42 +++--
 .../Gbase8aTypeMapper.java}                        |  97 ++++++------
 .../internal/dialect/oracle/OracleTypeMapper.java  |   7 +-
 .../connector-jdbc-e2e/pom.xml                     |   6 +
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |  64 +++++---
 .../connectors/seatunnel/jdbc/JdbcCase.java        |   1 +
 .../connectors/seatunnel/jdbc/JdbcGbse8adbIT.java  | 117 ++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcOracledbIT.java  |  45 +++---
 .../seatunnel/jdbc/JdbcStarRocksdbIT.java          | 169 +++++++++++----------
 .../resources/jdbc_gbase8a_source_to_assert.conf   |  67 ++++++++
 14 files changed, 441 insertions(+), 259 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 989e69b2c..79056ee4d 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -116,6 +116,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| gbase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | /                                                  | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
 | starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | /                                                  | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
 
 ## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 0d3ca3981..546ecd26e 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -98,6 +98,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| gbase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
 | starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
 
 ## Example
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
index fd182ce3b..b90097445 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
@@ -15,34 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
-import lombok.Builder;
-import lombok.Data;
+public class Gbase8aDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Gbase8a";
+    }
 
-import java.util.Map;
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new Gbase8aJdbcRowConverter();
+    }
 
-@Data
-@Builder
-public class JdbcCase {
-    private String dockerImage;
-    private String networkAliases;
-    private String driverClass;
-    private String host;
-    private String userName;
-    private String password;
-    private int port;
-    private String dataBase;
-    private String sourceTable;
-    private String sinkTable;
-    private String driverJar;
-    private String jdbcUrl;
-    private String ddlSource;
-    private String ddlSink;
-    private String initDataSql;
-    private String configFile;
-    private SeaTunnelRow seaTunnelRow;
-    private Map<String, String> containerEnv;
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new Gbase8aTypeMapper();
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
index fd182ce3b..31787f497 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
@@ -15,34 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
-import lombok.Builder;
-import lombok.Data;
+import com.google.auto.service.AutoService;
 
-import java.util.Map;
+@AutoService(JdbcDialectFactory.class)
+public class Gbase8aDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:gbase:");
+    }
 
-@Data
-@Builder
-public class JdbcCase {
-    private String dockerImage;
-    private String networkAliases;
-    private String driverClass;
-    private String host;
-    private String userName;
-    private String password;
-    private int port;
-    private String dataBase;
-    private String sourceTable;
-    private String sinkTable;
-    private String driverJar;
-    private String jdbcUrl;
-    private String ddlSource;
-    private String ddlSink;
-    private String initDataSql;
-    private String configFile;
-    private SeaTunnelRow seaTunnelRow;
-    private Map<String, String> containerEnv;
+    @Override
+    public JdbcDialect create() {
+        return new Gbase8aDialect();
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
similarity index 53%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
index fd182ce3b..6ada1d39e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
@@ -15,34 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
 
-import lombok.Builder;
-import lombok.Data;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 
-import java.util.Map;
+public class Gbase8aJdbcRowConverter extends AbstractJdbcRowConverter {
+    @Override
+    public String converterName() {
+        return "Gbase8a";
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
 
-@Data
-@Builder
-public class JdbcCase {
-    private String dockerImage;
-    private String networkAliases;
-    private String driverClass;
-    private String host;
-    private String userName;
-    private String password;
-    private int port;
-    private String dataBase;
-    private String sourceTable;
-    private String sinkTable;
-    private String driverJar;
-    private String jdbcUrl;
-    private String ddlSource;
-    private String ddlSink;
-    private String initDataSql;
-    private String configFile;
-    private SeaTunnelRow seaTunnelRow;
-    private Map<String, String> containerEnv;
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
similarity index 52%
copy from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
index a11a8c9c7..65677ea22 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
 
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -30,88 +30,81 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 
 @Slf4j
-public class OracleTypeMapper implements JdbcDialectTypeMapper {
+public class Gbase8aTypeMapper implements JdbcDialectTypeMapper {
 
+    //ref http://www.gbase.cn/down/4419.html
     // ============================data types=====================
-
-    private static final String ORACLE_UNKNOWN = "UNKNOWN";
+    private static final String GBASE8A_UNKNOWN = "UNKNOWN";
 
     // -------------------------number----------------------------
-    private static final String ORACLE_BINARY_DOUBLE = "BINARY_DOUBLE";
-    private static final String ORACLE_BINARY_FLOAT = "BINARY_FLOAT";
-    private static final String ORACLE_NUMBER = "NUMBER";
-    private static final String ORACLE_FLOAT = "FLOAT";
-    private static final String ORACLE_REAL = "REAL";
-    private static final String ORACLE_INTEGER = "INTEGER";
+    private static final String GBASE8A_INT = "INT";
+    private static final String GBASE8A_TINYINT = "TINYINT";
+    private static final String GBASE8A_SMALLINT = "SMALLINT";
+    private static final String GBASE8A_BIGINT = "BIGINT";
+    private static final String GBASE8A_DECIMAL = "DECIMAL";
+    private static final String GBASE8A_FLOAT = "FLOAT";
+    private static final String GBASE8A_DOUBLE = "DOUBLE";
 
     // -------------------------string----------------------------
-    private static final String ORACLE_CHAR = "CHAR";
-    private static final String ORACLE_VARCHAR2 = "VARCHAR2";
-    private static final String ORACLE_NCHAR = "NCHAR";
-    private static final String ORACLE_NVARCHAR2 = "NVARCHAR2";
-    private static final String ORACLE_LONG = "LONG";
-    private static final String ORACLE_ROWID = "ROWID";
-    private static final String ORACLE_CLOB = "CLOB";
-    private static final String ORACLE_NCLOB = "NCLOB";
+    private static final String GBASE8A_CHAR = "CHAR";
+    private static final String GBASE8A_VARCHAR = "VARCHAR";
+
 
     // ------------------------------time-------------------------
-    private static final String ORACLE_DATE = "DATE";
-    private static final String ORACLE_TIMESTAMP = "TIMESTAMP";
-    private static final String ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
+    private static final String GBASE8A_DATE = "DATE";
+    private static final String GBASE8A_TIME = "TIME";
+    private static final String GBASE8A_TIMESTAMP = "TIMESTAMP";
+    private static final String GBASE8A_DATETIME = "DATETIME";
 
     // ------------------------------blob-------------------------
-    private static final String ORACLE_BLOB = "BLOB";
-    private static final String ORACLE_BFILE = "BFILE";
-    private static final String ORACLE_RAW = "RAW";
-    private static final String ORACLE_LONG_RAW = "LONG RAW";
+    private static final String GBASE8A_BLOB = "BLOB";
+    private static final String GBASE8A_TEXT = "TEXT";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
-        String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();
-        String columnName = metadata.getColumnName(colIndex);
+        String gbase8aType = metadata.getColumnTypeName(colIndex).toUpperCase();
         int precision = metadata.getPrecision(colIndex);
         int scale = metadata.getScale(colIndex);
-        switch (oracleType) {
-            case ORACLE_INTEGER:
+        switch (gbase8aType) {
+            case GBASE8A_TINYINT:
+                return BasicType.BYTE_TYPE;
+            case GBASE8A_SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case GBASE8A_INT:
                 return BasicType.INT_TYPE;
-            case ORACLE_NUMBER:
+            case GBASE8A_BIGINT:
+                return BasicType.LONG_TYPE;
+            case GBASE8A_DECIMAL:
                 if (precision < 38) {
                     return new DecimalType(precision, scale);
                 }
                 return new DecimalType(38, 18);
-            case ORACLE_FLOAT:
-            case ORACLE_BINARY_DOUBLE:
+            case GBASE8A_DOUBLE:
                 return BasicType.DOUBLE_TYPE;
-            case ORACLE_BINARY_FLOAT:
-            case ORACLE_REAL:
+            case GBASE8A_FLOAT:
                 return BasicType.FLOAT_TYPE;
-            case ORACLE_CHAR:
-            case ORACLE_NCHAR:
-            case ORACLE_NVARCHAR2:
-            case ORACLE_VARCHAR2:
-            case ORACLE_LONG:
-            case ORACLE_ROWID:
-            case ORACLE_NCLOB:
-            case ORACLE_CLOB:
+            case GBASE8A_CHAR:
+            case GBASE8A_VARCHAR:
                 return BasicType.STRING_TYPE;
-            case ORACLE_DATE:
-            case ORACLE_TIMESTAMP:
-            case ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case GBASE8A_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case GBASE8A_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case GBASE8A_TIMESTAMP:
+            case GBASE8A_DATETIME:
                 return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-            case ORACLE_BLOB:
-            case ORACLE_RAW:
-            case ORACLE_LONG_RAW:
-            case ORACLE_BFILE:
+            case GBASE8A_BLOB:
+            case GBASE8A_TEXT:
                 return PrimitiveByteArrayType.INSTANCE;
             //Doesn't support yet
-            case ORACLE_UNKNOWN:
+            case GBASE8A_UNKNOWN:
             default:
                 final String jdbcColumnName = metadata.getColumnName(colIndex);
                 throw new UnsupportedOperationException(
                     String.format(
-                        "Doesn't support ORACLE type '%s' on column '%s'  yet.",
-                        oracleType, jdbcColumnName));
+                        "Doesn't support GBASE8A type '%s' on column '%s'  yet.",
+                        gbase8aType, jdbcColumnName));
         }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
index a11a8c9c7..eb1f9097f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
@@ -75,12 +75,11 @@ public class OracleTypeMapper implements JdbcDialectTypeMapper {
         switch (oracleType) {
             case ORACLE_INTEGER:
                 return BasicType.INT_TYPE;
+            case ORACLE_FLOAT:
             case ORACLE_NUMBER:
-                if (precision < 38) {
-                    return new DecimalType(precision, scale);
-                }
+                //The float type will be converted to DecimalType(10, -127),
+                // which will lose precision in the spark engine
                 return new DecimalType(38, 18);
-            case ORACLE_FLOAT:
             case ORACLE_BINARY_DOUBLE:
                 return BasicType.DOUBLE_TYPE;
             case ORACLE_BINARY_FLOAT:
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
index 000ce2c49..2a6516772 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
@@ -45,6 +45,12 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- jdbc containers -->
         <dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 3082881da..4e9088003 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
 import static org.awaitility.Awaitility.given;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
@@ -37,11 +39,15 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.sql.Connection;
-import java.sql.DriverManager;
+import java.sql.Driver;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -49,10 +55,8 @@ import java.util.stream.Stream;
 public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource {
 
     protected static final String HOST = "HOST";
-    protected Connection jdbcConnection;
     protected GenericContainer<?> dbServer;
-    private JdbcCase jdbcCase;
-    private String jdbcUrl;
+    protected JdbcCase jdbcCase;
 
     abstract JdbcCase getJdbcCase();
 
@@ -62,13 +66,18 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
 
     abstract SeaTunnelRow initTestData();
 
+    protected Connection createAndChangeDatabase(Connection connection) {
+        //do nothing
+        return connection;
+    }
+
     @TestContainerExtension
     private final ContainerExtendedFactory extendedFactory = container -> {
         Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar());
         Assertions.assertEquals(0, extraCommands.getExitCode());
     };
 
-    private void getContainer() throws ClassNotFoundException, SQLException {
+    private void getContainer() throws SQLException, MalformedURLException, ClassNotFoundException, InstantiationException, IllegalAccessException {
         jdbcCase = this.getJdbcCase();
         dbServer = new GenericContainer<>(jdbcCase.getDockerImage())
             .withNetwork(NETWORK)
@@ -78,21 +87,28 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
         dbServer.setPortBindings(Lists.newArrayList(
             String.format("%s:%s", jdbcCase.getPort(), jdbcCase.getPort())));
         Startables.deepStart(Stream.of(dbServer)).join();
-        Class.forName(jdbcCase.getDriverClass());
+
         given().ignoreExceptions()
             .await()
             .atMost(180, TimeUnit.SECONDS)
-            .untilAsserted(this::initializeJdbcConnection);
-        initializeJdbcTable();
+            .untilAsserted(() -> {
+                this.initializeJdbcConnection(jdbcCase.getJdbcUrl());
+            });
+        this.initializeJdbcTable();
     }
 
-    protected void initializeJdbcConnection() throws SQLException {
-        jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost());
-        jdbcConnection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword());
+    protected Connection initializeJdbcConnection(String jdbcUrl) throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException {
+        URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(jdbcCase.getDriverJar())}, AbstractJdbcIT.class.getClassLoader());
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        Driver driver = (Driver) urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance();
+        Properties props = new Properties();
+        props.put("user", jdbcCase.getUserName());
+        props.put("password", jdbcCase.getPassword());
+        return driver.connect(jdbcUrl.replace(HOST, dbServer.getHost()), props);
     }
 
     private void batchInsertData() {
-        try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
+        try (Connection connection = initializeJdbcConnection(String.format(jdbcCase.getJdbcTemplate(), jdbcCase.getPort(), jdbcCase.getDataBase()))) {
             connection.setAutoCommit(false);
             try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) {
 
@@ -102,20 +118,25 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
                 preparedStatement.execute();
             }
             connection.commit();
-        } catch (SQLException exception) {
-            exception.printStackTrace();
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new RuntimeException("get connection error", exception);
         }
     }
 
-    private void initializeJdbcTable() throws SQLException {
-        try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
-            Statement statement = connection.createStatement();
+    private void initializeJdbcTable() {
+        try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) {
+            Connection newConnection = createAndChangeDatabase(connection);
+            Statement statement = newConnection.createStatement();
             String createSource = jdbcCase.getDdlSource();
             String createSink = jdbcCase.getDdlSink();
             statement.execute(createSource);
-            statement.execute(createSink);
-        } catch (SQLException exception) {
-            exception.printStackTrace();
+            if (StringUtils.isNotEmpty(createSink)) {
+                statement.execute(createSink);
+            }
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new RuntimeException("get connection error", exception);
         }
         this.batchInsertData();
     }
@@ -128,9 +149,6 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
 
     @Override
     public void tearDown() throws Exception {
-        if (jdbcConnection != null) {
-            jdbcConnection.close();
-        }
         if (dbServer != null) {
             dbServer.close();
         }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index fd182ce3b..d4659a9b3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -38,6 +38,7 @@ public class JdbcCase {
     private String sourceTable;
     private String sinkTable;
     private String driverJar;
+    private String jdbcTemplate;
     private String jdbcUrl;
     private String ddlSource;
     private String ddlSink;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java
new file mode 100644
index 000000000..00efd4f1b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JdbcGbse8adbIT extends AbstractJdbcIT {
+
+    private static final String DOCKER_IMAGE = "shihd/gbase8a:1.0";
+    private static final String NETWORK_ALIASES = "e2e_gbase8aDb";
+    private static final String DRIVER_CLASS = "com.gbase.jdbc.Driver";
+    private static final int PORT = 5258;
+    private static final String URL = "jdbc:gbase://" + HOST + ":%s/%s?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true";
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "root";
+    private static final String DATABASE = "gbase";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String DRIVER_JAR = "https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar";
+    private static final String CONFIG_FILE = "/jdbc_gbase8a_source_to_assert.conf";
+    private static final String DDL_SOURCE = "CREATE TABLE " + SOURCE_TABLE + "(\n" +
+        "  \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" +
+        "  \"char_10_col\" char(10) DEFAULT NULL,\n" +
+        "  \"text_col\" text,\n" +
+        "  \"decimal_col\" decimal(10,0) DEFAULT NULL,\n" +
+        "  \"float_col\" float(12,0) DEFAULT NULL,\n" +
+        "  \"int_col\" int(11) DEFAULT NULL,\n" +
+        "  \"tinyint_col\" tinyint(4) DEFAULT NULL,\n" +
+        "  \"smallint_col\" smallint(6) DEFAULT NULL,\n" +
+        "  \"double_col\" double(22,0) DEFAULT NULL,\n" +
+        "  \"bigint_col\" bigint(20) DEFAULT NULL,\n" +
+        "  \"date_col\" date DEFAULT NULL,\n" +
+        "  \"timestamp_col\" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
+        "  \"datetime_col\" datetime DEFAULT NULL,\n" +
+        "  \"blob_col\" blob\n" +
+        ")";
+    private static final String INIT_DATA_SQL = "insert into " + SOURCE_TABLE + " (\n" +
+        "  varchar_10_col,\n" +
+        "  char_10_col,\n" +
+        "  text_col,\n" +
+        "  decimal_col,\n" +
+        "  float_col,\n" +
+        "  int_col,\n" +
+        "  tinyint_col,\n" +
+        "  smallint_col,\n" +
+        "  double_col,\n" +
+        "  bigint_col,\n" +
+        "  date_col,\n" +
+        "  timestamp_col,\n" +
+        "  datetime_col,\n" +
+        "  blob_col\n" +
+        ")values(\n" +
+        "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+        ")";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(URL, PORT, DATABASE);
+        return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
+            .host(HOST).port(PORT).jdbcTemplate(URL).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+            .sourceTable(SOURCE_TABLE).driverJar(DRIVER_JAR)
+            .ddlSource(DDL_SOURCE).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+    }
+
+    @Override
+    void compareResult() {
+        //do nothing
+    }
+
+    @Override
+    void clearSinkTable() {
+        //do nothing
+    }
+
+    @Override
+    SeaTunnelRow initTestData() {
+        return new SeaTunnelRow(
+            new Object[]{"varchar", "char10col1", "text_col".getBytes(StandardCharsets.UTF_8), 122, 122.0, 122, 100, 1212, 122.0,
+                3112121, new java.sql.Date(LocalDate.now().toEpochDay()), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), "blob".getBytes(StandardCharsets.UTF_8)});
+    }
+
+    protected Connection createAndChangeDatabase(Connection connection) {
+        try {
+            connection.prepareStatement("CREATE DATABASE test").executeUpdate();
+            jdbcCase.setDataBase("test");
+            connection.close();
+            return initializeJdbcConnection(String.format(URL, PORT, jdbcCase.getDataBase()));
+        } catch (Exception e) {
+            throw new RuntimeException("create database error", e);
+        }
+
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
index 48e34c0c7..036415f69 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
@@ -26,6 +26,7 @@ import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -98,7 +99,7 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
         "  raw_col,\n" +
         "  blob_col\n" +
         ")values(\n" +
-        "\t?,?,?,?,?,?,?,?,?,?,?,?,?,rawtohex(?),rawtohex(?)\n" +
+        "\t?,?,?,?,?,?,?,?,?,?,?,?,rawtohex(?),rawtohex(?)\n" +
         ")";
 
     @Override
@@ -109,7 +110,7 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
         containerEnv.put("APP_USER_PASSWORD", PASSWORD);
         String jdbcUrl = String.format(URL, PORT, DATABASE);
         return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
-            .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+            .host(HOST).port(PORT).jdbcTemplate(URL).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
             .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(DRIVER_JAR)
             .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
     }
@@ -119,35 +120,39 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
         String sourceSql = "select * from " + SOURCE_TABLE;
         String sinkSql = "select * from " + SINK_TABLE;
         List<String> columns = Lists.newArrayList("varchar_10_col", "char_10_col", "clob_col", "number_3_sf_2_dp", "integer_col", "float_col", "real_col", "binary_float_col", "binary_double_col", "date_col", "timestamp_with_3_frac_sec_col", "timestamp_with_local_tz", "raw_col", "blob_col");
-        Statement sourceStatement = jdbcConnection.createStatement();
-        Statement sinkStatement = jdbcConnection.createStatement();
-        ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
-        ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
-        while (sourceResultSet.next()) {
-            if (sinkResultSet.next()) {
-                for (String column : columns) {
-                    Object source = sourceResultSet.getObject(column);
-                    Object sink = sinkResultSet.getObject(column);
-                    if (!Objects.deepEquals(source, sink)) {
+        try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) {
+            Statement sourceStatement = connection.createStatement();
+            Statement sinkStatement = connection.createStatement();
+            ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : columns) {
+                        Object source = sourceResultSet.getObject(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
 
-                        InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
-                        InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
-                        String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
-                        String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
-                        Assertions.assertEquals(sourceValue, sinkValue);
+                            InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
+                            InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
+                            String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                            String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                        Assertions.assertTrue(true);
                     }
-                    Assertions.assertTrue(true);
                 }
             }
+        } catch (Exception e) {
+            throw new RuntimeException("get oracle connection error", e);
         }
         clearSinkTable();
     }
 
     @Override
     void clearSinkTable() {
-        try (Statement statement = jdbcConnection.createStatement()) {
+        try (Statement statement = initializeJdbcConnection(jdbcCase.getJdbcUrl()).createStatement()) {
             statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE));
-        } catch (SQLException e) {
+        } catch (Exception e) {
             throw new RuntimeException("test oracle server image error", e);
         }
     }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index 90b062985..20c5da159 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -22,9 +22,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil;
 
 import org.junit.jupiter.api.Assertions;
 
-import java.io.IOException;
+import java.sql.Connection;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
@@ -46,100 +45,104 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
     private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
     private static final String CONFIG_FILE = "/jdbc_starrocks_source_to_sink.conf";
 
-    private static final String DDL_SOURCE = "create table "  + DATABASE + "." + SOURCE_TABLE + " (\n" +
-            "  BIGINT_COL     BIGINT,\n" +
-            "  LARGEINT_COL   LARGEINT,\n" +
-            "  SMALLINT_COL   SMALLINT,\n" +
-            "  TINYINT_COL    TINYINT,\n" +
-            "  BOOLEAN_COL    BOOLEAN,\n" +
-            "  DECIMAL_COL    DECIMAL,\n" +
-            "  DOUBLE_COL     DOUBLE,\n" +
-            "  FLOAT_COL      FLOAT,\n" +
-            "  INT_COL        INT,\n" +
-            "  CHAR_COL       CHAR,\n" +
-            "  VARCHAR_11_COL VARCHAR(11),\n" +
-            "  STRING_COL     STRING,\n" +
-            "  DATETIME_COL   DATETIME,\n" +
-            "  DATE_COL       DATE\n" +
-            ")ENGINE=OLAP\n"  +
-            "DUPLICATE KEY(`BIGINT_COL`)\n" +
-            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
-            "PROPERTIES (\n" +
-            "\"replication_num\" = \"1\",\n" +
-            "\"in_memory\" = \"false\"," +
-            "\"in_memory\" = \"false\"," +
-            "\"storage_format\" = \"DEFAULT\"" +
-            ")";
-
-
-    private static final String DDL_SINK = "create table "   + DATABASE + "." + SINK_TABLE + " (\n" +
-            "  BIGINT_COL     BIGINT,\n" +
-            "  LARGEINT_COL   LARGEINT,\n" +
-            "  SMALLINT_COL   SMALLINT,\n" +
-            "  TINYINT_COL    TINYINT,\n" +
-            "  BOOLEAN_COL    BOOLEAN,\n" +
-            "  DECIMAL_COL    DECIMAL,\n" +
-            "  DOUBLE_COL     DOUBLE,\n" +
-            "  FLOAT_COL      FLOAT,\n" +
-            "  INT_COL        INT,\n" +
-            "  CHAR_COL       CHAR,\n" +
-            "  VARCHAR_11_COL VARCHAR(11),\n" +
-            "  STRING_COL     STRING,\n" +
-            "  DATETIME_COL   DATETIME,\n" +
-            "  DATE_COL       DATE\n" +
-            ")ENGINE=OLAP\n"  +
-            "DUPLICATE KEY(`BIGINT_COL`)\n" +
-            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
-            "PROPERTIES (\n" +
-            "\"replication_num\" = \"1\",\n" +
-            "\"in_memory\" = \"false\"," +
-            "\"in_memory\" = \"false\"," +
-            "\"storage_format\" = \"DEFAULT\"" +
-            ")";
-
-    private static final String INIT_DATA_SQL = "insert into "   + DATABASE + "." + SOURCE_TABLE + " (\n" +
-            "  BIGINT_COL,\n" +
-            "  LARGEINT_COL,\n" +
-            "  SMALLINT_COL,\n" +
-            "  TINYINT_COL,\n" +
-            "  BOOLEAN_COL,\n" +
-            "  DECIMAL_COL,\n" +
-            "  DOUBLE_COL,\n" +
-            "  FLOAT_COL,\n" +
-            "  INT_COL,\n" +
-            "  CHAR_COL,\n" +
-            "  VARCHAR_11_COL,\n" +
-            "  STRING_COL,\n" +
-            "  DATETIME_COL,\n" +
-            "  DATE_COL\n" +
-            ")values(\n" +
-            "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
-            ")";
+    private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" +
+        "  BIGINT_COL     BIGINT,\n" +
+        "  LARGEINT_COL   LARGEINT,\n" +
+        "  SMALLINT_COL   SMALLINT,\n" +
+        "  TINYINT_COL    TINYINT,\n" +
+        "  BOOLEAN_COL    BOOLEAN,\n" +
+        "  DECIMAL_COL    DECIMAL,\n" +
+        "  DOUBLE_COL     DOUBLE,\n" +
+        "  FLOAT_COL      FLOAT,\n" +
+        "  INT_COL        INT,\n" +
+        "  CHAR_COL       CHAR,\n" +
+        "  VARCHAR_11_COL VARCHAR(11),\n" +
+        "  STRING_COL     STRING,\n" +
+        "  DATETIME_COL   DATETIME,\n" +
+        "  DATE_COL       DATE\n" +
+        ")ENGINE=OLAP\n" +
+        "DUPLICATE KEY(`BIGINT_COL`)\n" +
+        "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+        "PROPERTIES (\n" +
+        "\"replication_num\" = \"1\",\n" +
+        "\"in_memory\" = \"false\"," +
+        "\"in_memory\" = \"false\"," +
+        "\"storage_format\" = \"DEFAULT\"" +
+        ")";
+
+
+    private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" +
+        "  BIGINT_COL     BIGINT,\n" +
+        "  LARGEINT_COL   LARGEINT,\n" +
+        "  SMALLINT_COL   SMALLINT,\n" +
+        "  TINYINT_COL    TINYINT,\n" +
+        "  BOOLEAN_COL    BOOLEAN,\n" +
+        "  DECIMAL_COL    DECIMAL,\n" +
+        "  DOUBLE_COL     DOUBLE,\n" +
+        "  FLOAT_COL      FLOAT,\n" +
+        "  INT_COL        INT,\n" +
+        "  CHAR_COL       CHAR,\n" +
+        "  VARCHAR_11_COL VARCHAR(11),\n" +
+        "  STRING_COL     STRING,\n" +
+        "  DATETIME_COL   DATETIME,\n" +
+        "  DATE_COL       DATE\n" +
+        ")ENGINE=OLAP\n" +
+        "DUPLICATE KEY(`BIGINT_COL`)\n" +
+        "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+        "PROPERTIES (\n" +
+        "\"replication_num\" = \"1\",\n" +
+        "\"in_memory\" = \"false\"," +
+        "\"in_memory\" = \"false\"," +
+        "\"storage_format\" = \"DEFAULT\"" +
+        ")";
+
+    private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" +
+        "  BIGINT_COL,\n" +
+        "  LARGEINT_COL,\n" +
+        "  SMALLINT_COL,\n" +
+        "  TINYINT_COL,\n" +
+        "  BOOLEAN_COL,\n" +
+        "  DECIMAL_COL,\n" +
+        "  DOUBLE_COL,\n" +
+        "  FLOAT_COL,\n" +
+        "  INT_COL,\n" +
+        "  CHAR_COL,\n" +
+        "  VARCHAR_11_COL,\n" +
+        "  STRING_COL,\n" +
+        "  DATETIME_COL,\n" +
+        "  DATE_COL\n" +
+        ")values(\n" +
+        "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+        ")";
 
     @Override
     JdbcCase getJdbcCase() {
         Map<String, String> containerEnv = new HashMap<>();
         String jdbcUrl = String.format(URL, SR_PORT);
         return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
-                .host(HOST).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
-                .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR)
-                .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+            .host(HOST).jdbcTemplate(URL).dataBase(DATABASE).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+            .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR)
+            .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
     }
 
     @Override
-    void compareResult() throws SQLException, IOException {
-        assertHasData(SOURCE_TABLE);
-        assertHasData(SINK_TABLE);
-        JdbcCompareUtil.compare(jdbcConnection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE),
+    void compareResult() {
+        try (Connection connection = initializeJdbcConnection(URL)) {
+            assertHasData(SOURCE_TABLE);
+            assertHasData(SINK_TABLE);
+            JdbcCompareUtil.compare(connection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE),
                 String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE), COLUMN_STRING);
+        } catch (Exception e) {
+            throw new RuntimeException("get starRocks connection error", e);
+        }
         clearSinkTable();
     }
 
     @Override
     void clearSinkTable() {
-        try (Statement statement = jdbcConnection.createStatement()) {
+        try (Statement statement = initializeJdbcConnection(URL).createStatement()) {
             statement.execute(String.format("TRUNCATE TABLE %s", DATABASE + "." + SINK_TABLE));
-        } catch (SQLException e) {
+        } catch (Exception e) {
             throw new RuntimeException("test starrocks server image error", e);
         }
     }
@@ -147,15 +150,15 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
     @Override
     SeaTunnelRow initTestData() {
         return new SeaTunnelRow(
-                new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"});
+            new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"});
     }
 
     private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
+        try (Statement statement = initializeJdbcConnection(URL).createStatement()) {
             String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
             ResultSet source = statement.executeQuery(sql);
             Assertions.assertTrue(source.next());
-        } catch (SQLException e) {
+        } catch (Exception e) {
             throw new RuntimeException("test starrocks server image error", e);
         }
     }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
new file mode 100644
index 000000000..4b4c5ad62
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  Jdbc {
+    driver = com.gbase.jdbc.Driver
+    url = "jdbc:gbase://e2e_gbase8aDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+    user = root
+    password = root
+    query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, timestamp_col, datetime_col, blob_col from e2e_table_source"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          }
+        ]
+      }
+  }
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
+}