You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/09 07:24:53 UTC

[incubator-seatunnel] branch dev updated: [Connector-V2][JDBC-connector] support Jdbc dm (#2377)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7278209ca [Connector-V2][JDBC-connector] support Jdbc dm (#2377)
7278209ca is described below

commit 7278209ca2d345d436351f055010b9a8864b20e0
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Fri Sep 9 15:24:48 2022 +0800

    [Connector-V2][JDBC-connector] support Jdbc dm (#2377)
    
    * [Connector-V2][JDBC-connector] Add DM source and sink connector
---
 docs/en/connector-v2/sink/Jdbc.md                  |  41 +++--
 docs/en/connector-v2/source/Jdbc.md                |  32 ++--
 pom.xml                                            |  16 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  12 +-
 .../jdbc/internal/dialect/dm/DmdbDialect.java      |  40 +++++
 .../internal/dialect/dm/DmdbDialectFactory.java    |  40 +++++
 .../internal/dialect/dm/DmdbJdbcRowConverter.java  |  39 ++++
 .../jdbc/internal/dialect/dm/DmdbTypeMapper.java   | 198 +++++++++++++++++++++
 .../connector-jdbc-flink-e2e/pom.xml               |  16 ++
 .../seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java    | 148 +++++++++++++++
 .../src/test/resources/jdbc/init_sql/dm_init.conf  | 122 +++++++++++++
 .../resources/jdbc/jdbc_dm_source_and_sink.conf    |  57 ++++++
 .../connector-jdbc-spark-e2e/pom.xml               |  17 ++
 .../seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java    | 151 ++++++++++++++++
 .../src/test/resources/jdbc/init_sql/dm_init.conf  | 122 +++++++++++++
 .../resources/jdbc/jdbc_dm_source_and_sink.conf    |  57 ++++++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 .../seatunnel-connector-spark-jdbc-e2e/pom.xml     |   1 +
 seatunnel-server/seatunnel-app/pom.xml             |   1 +
 19 files changed, 1066 insertions(+), 45 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index f8c883cec..ed6285eae 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -15,21 +15,21 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f
 
 ## Options
 
-| name | type | required | default value |
-| --- | --- | --- | --- |
-| url | String | Yes | - |
-| driver | String | Yes | - |
-| user | String | No | - |
-| password | String | No | - |
-| query | String | Yes | - |
-| connection_check_timeout_sec | Int | No | 30 |
-| max_retries | Int | No | 3 |
-| batch_size | Int | No | 300 |
-| batch_interval_ms | Int | No | 1000 |
-| is_exactly_once | Boolean | No | false |
-| xa_data_source_class_name | String | No | - |
-| max_commit_attempts | Int | No | 3 |
-| transaction_timeout_sec | Int | No | -1 |
+| name                         | type    | required | default value |
+|------------------------------|---------|----------|---------------|
+| url                          | String  | Yes      | -             |
+| driver                       | String  | Yes      | -             |
+| user                         | String  | No       | -             |
+| password                     | String  | No       | -             |
+| query                        | String  | Yes      | -             |
+| connection_check_timeout_sec | Int     | No       | 30            |
+| max_retries                  | Int     | No       | 3             |
+| batch_size                   | Int     | No       | 300           |
+| batch_interval_ms            | Int     | No       | 1000          |
+| is_exactly_once              | Boolean | No       | false         |
+| xa_data_source_class_name    | String  | No       | -             |
+| max_commit_attempts          | Int     | No       | 3             |
+| transaction_timeout_sec      | Int     | No       | -1            |
 
 ### driver [string]
 The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
@@ -64,7 +64,7 @@ For batch writing, when the number of buffers reaches the number of `batch_size`
 Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set `xa_data_source_class_name`.
 
 ### xa_data_source_class_name[string]
-The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource` and postgresql is `org.postgresql.xa.PGXADataSource`
+The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and please refer to appendix for other data sources
 
 ### max_commit_attempts[int]
 The number of retries for transaction commit failures
@@ -76,6 +76,15 @@ The timeout after the transaction is opened, the default is -1 (never timeout).
 In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup. For example, postgres needs to set `max_prepared_transactions > 1`
 Such as `ALTER SYSTEM set max_prepared_transactions to 10`.
 
+## appendix
+there are some reference value for params above.
+
+| datasource | driver                   | url                                       | xa_data_source_class_name           | maven                                                         |
+|------------|--------------------------|-------------------------------------------|-------------------------------------|---------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test          | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver    | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource    | https://mvnrepository.com/artifact/org.postgresql/postgresql  |                                                             |
+| dm         | dm.jdbc.driver.DmDriver  | jdbc:dm://localhost:5236                  | dm.jdbc.driver.DmdbXADataSource     | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18  |
+
 ## Example
 Simple
 ```
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 5f1e47ac9..ca229ee82 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -20,17 +20,17 @@ supports query SQL and can achieve projection effect.
 
 ##  Options
 
-| name | type | required | default value |
-| --- | --- | --- | --- |
-| url | String | Yes | - |
-| driver | String | Yes | - |
-| user | String | No | - |
-| password | String | No | - |
-| query | String | Yes | - |
-| connection_check_timeout_sec | Int | No | 30 |
-| partition_column | String | No | - |
-| partition_upper_bound | Long | No | - |
-| partition_lower_bound | Long | No | - |
+| name                         | type   | required | default value |
+|------------------------------|--------|----------|---------------|
+| url                          | String | Yes      | -             |
+| driver                       | String | Yes      | -             |
+| user                         | String | No       | -             |
+| password                     | String | No       | -             |
+| query                        | String | Yes      | -             |
+| connection_check_timeout_sec | Int    | No       | 30            |
+| partition_column             | String | No       | -             |
+| partition_upper_bound        | Long   | No       | -             |
+| partition_lower_bound        | Long   | No       | -             |
 
 ### driver [string]
 The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
@@ -66,6 +66,16 @@ The partition_column min value for scan, if not set SeaTunnel will query databas
 ## tips
 If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
 
+
+## appendix
+there are some reference value for params above.
+
+| datasource | driver                   | url                                       | maven                                                         |
+|------------|--------------------------|-------------------------------------------|---------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test          | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver    | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql  |                                                             |
+| dm         | dm.jdbc.driver.DmDriver  | jdbc:dm://localhost:5236                  | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18  |
+
 ## Example
 simple:
 ```Jdbc {
diff --git a/pom.xml b/pom.xml
index 9beb6a1fe..1a904df22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,7 @@
         <lombok.version>1.18.0</lombok.version>
         <mysql.version>8.0.16</mysql.version>
         <postgresql.version>42.3.3</postgresql.version>
+        <dm-jdbc.version>8.1.2.141</dm-jdbc.version>
         <skip.pmd.check>false</skip.pmd.check>
         <maven.deploy.skip>false</maven.deploy.skip>
         <maven.javadoc.skip>false</maven.javadoc.skip>
@@ -206,21 +207,6 @@
                 <artifactId>seatunnel-config-shade</artifactId>
                 <version>${seatunnel.config.shade.version}</version>
             </dependency>
-
-            <!--Because the license is not in compliance, if you need to use MySQL, you can add it yourself-->
-            <dependency>
-                <groupId>mysql</groupId>
-                <artifactId>mysql-connector-java</artifactId>
-                <version>${mysql.version}</version>
-                <scope>test</scope>
-            </dependency>
-
-            <dependency>
-                <groupId>org.postgresql</groupId>
-                <artifactId>postgresql</artifactId>
-                <version>${postgresql.version}</version>
-            </dependency>
-
             <dependency>
                 <groupId>commons-codec</groupId>
                 <artifactId>commons-codec</artifactId>
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index eb4e1b37d..68b6ad284 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -31,8 +31,9 @@
     
     <properties>
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
-        <pg.version>42.3.3</pg.version>
         <mysql.version>8.0.16</mysql.version>
+        <postgresql.version>42.3.3</postgresql.version>
+        <dm-jdbc.version>8.1.2.141</dm-jdbc.version>
     </properties>
 
     <dependencies>
@@ -47,7 +48,12 @@
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
-            <version>${pg.version}</version>
+            <version>${postgresql.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.dameng</groupId>
+            <artifactId>DmJdbcDriver18</artifactId>
+            <version>${dm-jdbc.version}</version>
         </dependency>
 
         <dependency>
@@ -57,4 +63,4 @@
         </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
new file mode 100644
index 000000000..f13bb1d7a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class DmdbDialect implements JdbcDialect {
+
+    @Override
+    public String dialectName() {
+        return "DM";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new DmdbJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new DmdbTypeMapper();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
new file mode 100644
index 000000000..0578bba3c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link DmdbDialect}.
+ */
+@AutoService(JdbcDialectFactory.class)
+public class DmdbDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:dm:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new DmdbDialect();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java
new file mode 100644
index 000000000..6aa666314
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class DmdbJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "DM";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java
new file mode 100644
index 000000000..a5aea5571
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class DmdbTypeMapper implements JdbcDialectTypeMapper {
+
+    // ============================data types=====================
+    private static final String DM_BIT = "BIT";
+
+    // ----------------------------number-------------------------
+    private static final String DM_NUMERIC = "NUMERIC";
+    private static final String DM_NUMBER = "NUMBER";
+    private static final String DM_DECIMAL = "DECIMAL";
+    /**
+     * same to DECIMAL
+     */
+    private static final String DM_DEC = "DEC";
+
+    // ----------------------------int-----------------------------
+    private static final String DM_INTEGER = "INTEGER";
+    private static final String DM_INT = "INT";
+    public static final String DM_PLS_INTEGER = "PLS_INTEGER";
+    private static final String DM_BIGINT = "BIGINT";
+    private static final String DM_TINYINT = "TINYINT";
+    private static final String DM_BYTE = "BYTE";
+    private static final String DM_SMALLINT = "SMALLINT";
+
+    // dm float is double for Cpp.
+    private static final String DM_FLOAT = "FLOAT";
+    private static final String DM_DOUBLE = "DOUBLE";
+    private static final String DM_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String DM_REAL = "REAL";
+
+    // DM_CHAR DM_CHARACTER DM_VARCHAR DM_VARCHAR2 max is 32767
+    private static final String DM_CHAR = "CHAR";
+    private static final String DM_CHARACTER = "CHARACTER";
+    private static final String DM_VARCHAR = "VARCHAR";
+    private static final String DM_VARCHAR2 = "VARCHAR2";
+    private static final String DM_LONGVARCHAR = "LONGVARCHAR";
+    private static final String DM_CLOB = "CLOB";
+    private static final String DM_TEXT = "TEXT";
+    private static final String DM_LONG = "LONG";
+
+    // ------------------------------time-------------------------
+    private static final String DM_DATE = "DATE";
+    private static final String DM_TIME = "TIME";
+    private static final String DM_TIMESTAMP = "TIMESTAMP";
+    private static final String DM_DATETIME = "DATETIME";
+
+    // ---------------------------binary---------------------------
+    private static final String DM_BINARY = "BINARY";
+    private static final String DM_VARBINARY = "VARBINARY";
+
+    // -------------------------time interval-----------------------
+    private static final String DM_INTERVAL_YEAR_TO_MONTH = "INTERVAL YEAR TO MONTH";
+    private static final String DM_INTERVAL_YEAR = "INTERVAL YEAR";
+    private static final String DM_INTERVAL_MONTH = "INTERVAL MONTH";
+    private static final String DM_INTERVAL_DAY = "INTERVAL DAY";
+    private static final String DM_INTERVAL_DAY_TO_HOUR = "INTERVAL DAY TO HOUR";
+    private static final String DM_INTERVAL_DAY_TO_MINUTE = "INTERVAL DAY TO MINUTE";
+    private static final String DM_INTERVAL_DAY_TO_SECOND = "INTERVAL DAY TO SECOND";
+    private static final String DM_INTERVAL_HOUR = "INTERVAL HOUR";
+    private static final String DM_INTERVAL_HOUR_TO_MINUTE = "INTERVAL HOUR TO MINUTE";
+    private static final String DM_INTERVAL_HOUR_TO_SECOND = "INTERVAL HOUR TO SECOND";
+    private static final String DM_INTERVAL_MINUTE = "INTERVAL MINUTE";
+    private static final String DM_INTERVAL_MINUTE_TO_SECOND = "INTERVAL MINUTE TO SECOND";
+    private static final String DM_INTERVAL_SECOND = "INTERVAL SECOND";
+    // time zone
+    private static final String DM_TIME_WITH_TIME_ZONE = "TIME WITH TIME ZONE";
+    private static final String DM_TIMESTAMP_WITH_TIME_ZONE = "TIMESTAMP WITH TIME ZONE";
+    private static final String TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
+
+    // ------------------------------blob-------------------------
+    public static final String DM_BLOB = "BLOB";
+    public static final String DM_BFILE = "BFILE";
+    public static final String DM_IMAGE = "IMAGE";
+    public static final String DM_LONGVARBINARY = "LONGVARBINARY";
+
+    @Override
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String dmdbType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int precision = metadata.getPrecision(colIndex);
+        switch (dmdbType) {
+            case DM_BIT:
+                return BasicType.BOOLEAN_TYPE;
+
+            case DM_INT:
+            case DM_INTEGER:
+            case DM_PLS_INTEGER:
+                return BasicType.INT_TYPE;
+
+            case DM_TINYINT:
+            case DM_BYTE:
+                return BasicType.BYTE_TYPE;
+
+            case DM_SMALLINT:
+                return BasicType.SHORT_TYPE;
+
+            case DM_BIGINT:
+                return BasicType.LONG_TYPE;
+
+            case DM_NUMERIC:
+            case DM_NUMBER:
+            case DM_DECIMAL:
+            case DM_DEC:
+                if (precision > 0) {
+                    return new DecimalType(precision, metadata.getScale(colIndex));
+                }
+                return new DecimalType(38, 18);
+
+            case DM_REAL:
+                return BasicType.FLOAT_TYPE;
+
+            case DM_FLOAT:
+            case DM_DOUBLE_PRECISION:
+            case DM_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+
+            case DM_CHAR:
+            case DM_CHARACTER:
+            case DM_VARCHAR:
+            case DM_VARCHAR2:
+                // 100G-1 byte
+            case DM_TEXT:
+            case DM_LONG:
+            case DM_LONGVARCHAR:
+            case DM_CLOB:
+                return BasicType.STRING_TYPE;
+
+            case DM_TIMESTAMP:
+            case DM_DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+
+            case DM_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+
+            case DM_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+
+            // 100G-1 byte
+            case DM_BLOB:
+            case DM_BINARY:
+            case DM_VARBINARY:
+            case DM_LONGVARBINARY:
+            case DM_IMAGE:
+            case DM_BFILE:
+                return PrimitiveByteArrayType.INSTANCE;
+
+            //Doesn't support yet
+            case DM_INTERVAL_YEAR_TO_MONTH:
+            case DM_INTERVAL_YEAR:
+            case DM_INTERVAL_MONTH:
+            case DM_INTERVAL_DAY:
+            case DM_INTERVAL_DAY_TO_HOUR:
+            case DM_INTERVAL_DAY_TO_MINUTE:
+            case DM_INTERVAL_DAY_TO_SECOND:
+            case DM_INTERVAL_HOUR:
+            case DM_INTERVAL_HOUR_TO_MINUTE:
+            case DM_INTERVAL_HOUR_TO_SECOND:
+            case DM_INTERVAL_MINUTE:
+            case DM_INTERVAL_MINUTE_TO_SECOND:
+            case DM_INTERVAL_SECOND:
+            case DM_TIME_WITH_TIME_ZONE:
+            case DM_TIMESTAMP_WITH_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                    String.format("Doesn't support Dmdb type '%s' on column '%s'  yet.", dmdbType, jdbcColumnName));
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index e4361c359..4b4d1d998 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -46,6 +46,22 @@
             <version>1.17.3</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${postgresql.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.dameng</groupId>
+            <artifactId>DmJdbcDriver18</artifactId>
+            <version>${dm-jdbc.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
new file mode 100644
index 000000000..b88aab093
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> dbServer;
+
+    @BeforeEach
+    public void startDmdbContainer() throws ClassNotFoundException, SQLException {
+        dbServer = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        dbServer.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(dbServer)).join();
+        log.info("Dmdb container started");
+        // wait for Dmdb fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeDmdbContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+        if (dbServer != null) {
+            dbServer.close();
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }
+
+    @Test
+    @DisplayName("flink JDBC-DM test")
+    public void testJdbcDmdbSourceAndSink() throws IOException, InterruptedException, SQLException {
+        assertHasData(SOURCE_TABLE);
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        assertHasData(SINK_TABLE);
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
new file mode 100644
index 000000000..056c252a1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+dm_table_source = """
+create table if not exists "SYSDBA".e2e_table_source
+(
+    DM_BIT              BIT,
+    DM_INT              INT,
+    DM_INTEGER          INTEGER,
+    DM_PLS_INTEGER      PLS_INTEGER,
+    DM_TINYINT          TINYINT,
+
+    DM_BYTE             BYTE,
+    DM_SMALLINT         SMALLINT,
+    DM_BIGINT           BIGINT,
+
+    DM_NUMERIC          NUMERIC,
+    DM_NUMBER           NUMBER,
+    DM_DECIMAL          DECIMAL,
+    DM_DEC              DEC,
+
+    DM_REAL             REAL,
+    DM_FLOAT            FLOAT,
+    DM_DOUBLE_PRECISION DOUBLE PRECISION,
+    DM_DOUBLE           DOUBLE,
+
+    DM_CHAR             CHAR,
+    DM_CHARACTER        CHARACTER,
+    DM_VARCHAR          VARCHAR,
+    DM_VARCHAR2         VARCHAR2,
+    DM_TEXT             TEXT,
+    DM_LONG             LONG,
+    DM_LONGVARCHAR      LONGVARCHAR,
+    DM_CLOB             CLOB,
+
+    DM_TIMESTAMP        TIMESTAMP,
+    DM_DATETIME         DATETIME,
+    DM_TIME             TIME,
+    DM_DATE             DATE,
+
+    DM_BLOB             BLOB,
+    DM_BINARY           BINARY,
+    DM_VARBINARY        VARBINARY,
+    DM_LONGVARBINARY    LONGVARBINARY,
+    DM_IMAGE            IMAGE,
+    DM_BFILE            BFILE
+)
+"""
+
+dm_table_sink = """
+create table if not exists "SYSDBA".e2e_table_sink
+(
+    DM_BIT              BIT,
+    DM_INT              INT,
+    DM_INTEGER          INTEGER,
+    DM_PLS_INTEGER      PLS_INTEGER,
+    DM_TINYINT          TINYINT,
+
+    DM_BYTE             BYTE,
+    DM_SMALLINT         SMALLINT,
+    DM_BIGINT           BIGINT,
+
+    DM_NUMERIC          NUMERIC,
+    DM_NUMBER           NUMBER,
+    DM_DECIMAL          DECIMAL,
+    DM_DEC              DEC,
+
+    DM_REAL             REAL,
+    DM_FLOAT            FLOAT,
+    DM_DOUBLE_PRECISION DOUBLE PRECISION,
+    DM_DOUBLE           DOUBLE,
+
+    DM_CHAR             CHAR,
+    DM_CHARACTER        CHARACTER,
+    DM_VARCHAR          VARCHAR,
+    DM_VARCHAR2         VARCHAR2,
+    DM_TEXT             TEXT,
+    DM_LONG             LONG,
+    DM_LONGVARCHAR      LONGVARCHAR,
+    DM_CLOB             CLOB,
+
+    DM_TIMESTAMP        TIMESTAMP,
+    DM_DATETIME         DATETIME,
+    DM_TIME             TIME,
+    DM_DATE             DATE,
+
+    DM_BLOB             BLOB,
+    DM_BINARY           BINARY,
+    DM_VARBINARY        VARBINARY,
+    DM_LONGVARBINARY    LONGVARBINARY,
+    DM_IMAGE            IMAGE,
+    DM_BFILE            BFILE
+)
+"""
+// only need for source
+DML = """
+INSERT INTO "SYSDBA".e2e_table_source (
+DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT,
+DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE,
+DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB,
+DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
+DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
+VALUES
+(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
+ 'a', 'a', 'a', 'a', 'a', 'a', 'a',
+'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', '2022-08-13',
+null, null, null, null, null, null)
+"""
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
new file mode 100644
index 000000000..ba2468ccf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:dm://flink_e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """select * from "SYSDBA".e2e_table_source"""
+  }
+
+}
+
+transform {
+
+}
+
+sink {
+  Jdbc {
+    url = "jdbc:dm://flink_e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """
+INSERT INTO SYSDBA.e2e_table_sink (DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT, DM_NUMERIC, DM_NUMBER,
+ DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG,
+ DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
+VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+"""
+  }
+}
+
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index 8d96fb66a..a5242ef72 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -40,6 +40,23 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${postgresql.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.dameng</groupId>
+            <artifactId>DmJdbcDriver18</artifactId>
+            <version>${dm-jdbc.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
new file mode 100644
index 000000000..d1eb5043d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dbServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dbServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dbServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dbServer)).join();
+            log.info("dmdb container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeDmdbContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+        if (dbServer != null) {
+            dbServer.close();
+        }
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        URL resource = JdbcDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }
+
+    @Test
+    @DisplayName("spark JDBC-DM test for all type mapper")
+    public void testDMDBSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        assertHasData(SINK_TABLE);
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
new file mode 100644
index 000000000..056c252a1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/init_sql/dm_init.conf
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+dm_table_source = """
+create table if not exists "SYSDBA".e2e_table_source
+(
+    DM_BIT              BIT,
+    DM_INT              INT,
+    DM_INTEGER          INTEGER,
+    DM_PLS_INTEGER      PLS_INTEGER,
+    DM_TINYINT          TINYINT,
+
+    DM_BYTE             BYTE,
+    DM_SMALLINT         SMALLINT,
+    DM_BIGINT           BIGINT,
+
+    DM_NUMERIC          NUMERIC,
+    DM_NUMBER           NUMBER,
+    DM_DECIMAL          DECIMAL,
+    DM_DEC              DEC,
+
+    DM_REAL             REAL,
+    DM_FLOAT            FLOAT,
+    DM_DOUBLE_PRECISION DOUBLE PRECISION,
+    DM_DOUBLE           DOUBLE,
+
+    DM_CHAR             CHAR,
+    DM_CHARACTER        CHARACTER,
+    DM_VARCHAR          VARCHAR,
+    DM_VARCHAR2         VARCHAR2,
+    DM_TEXT             TEXT,
+    DM_LONG             LONG,
+    DM_LONGVARCHAR      LONGVARCHAR,
+    DM_CLOB             CLOB,
+
+    DM_TIMESTAMP        TIMESTAMP,
+    DM_DATETIME         DATETIME,
+    DM_TIME             TIME,
+    DM_DATE             DATE,
+
+    DM_BLOB             BLOB,
+    DM_BINARY           BINARY,
+    DM_VARBINARY        VARBINARY,
+    DM_LONGVARBINARY    LONGVARBINARY,
+    DM_IMAGE            IMAGE,
+    DM_BFILE            BFILE
+)
+"""
+
+dm_table_sink = """
+create table if not exists "SYSDBA".e2e_table_sink
+(
+    DM_BIT              BIT,
+    DM_INT              INT,
+    DM_INTEGER          INTEGER,
+    DM_PLS_INTEGER      PLS_INTEGER,
+    DM_TINYINT          TINYINT,
+
+    DM_BYTE             BYTE,
+    DM_SMALLINT         SMALLINT,
+    DM_BIGINT           BIGINT,
+
+    DM_NUMERIC          NUMERIC,
+    DM_NUMBER           NUMBER,
+    DM_DECIMAL          DECIMAL,
+    DM_DEC              DEC,
+
+    DM_REAL             REAL,
+    DM_FLOAT            FLOAT,
+    DM_DOUBLE_PRECISION DOUBLE PRECISION,
+    DM_DOUBLE           DOUBLE,
+
+    DM_CHAR             CHAR,
+    DM_CHARACTER        CHARACTER,
+    DM_VARCHAR          VARCHAR,
+    DM_VARCHAR2         VARCHAR2,
+    DM_TEXT             TEXT,
+    DM_LONG             LONG,
+    DM_LONGVARCHAR      LONGVARCHAR,
+    DM_CLOB             CLOB,
+
+    DM_TIMESTAMP        TIMESTAMP,
+    DM_DATETIME         DATETIME,
+    DM_TIME             TIME,
+    DM_DATE             DATE,
+
+    DM_BLOB             BLOB,
+    DM_BINARY           BINARY,
+    DM_VARBINARY        VARBINARY,
+    DM_LONGVARBINARY    LONGVARBINARY,
+    DM_IMAGE            IMAGE,
+    DM_BFILE            BFILE
+)
+"""
+// only need for source
+DML = """
+INSERT INTO "SYSDBA".e2e_table_source (
+DM_BIT, DM_INT, DM_INTEGER, DM_PLS_INTEGER, DM_TINYINT, DM_BYTE, DM_SMALLINT, DM_BIGINT,
+DM_NUMERIC, DM_NUMBER, DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE,
+DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2, DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB,
+DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE,
+DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY, DM_IMAGE, DM_BFILE)
+VALUES
+(0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, '1',
+ 'a', 'a', 'a', 'a', 'a', 'a', 'a',
+'2022-08-13 17:35:59.000000', '2022-08-13 17:36:11.000000', '15:45:00', '2022-08-13',
+null, null, null, null, null, null)
+"""
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
new file mode 100644
index 000000000..466b5b5c0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_dm_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:dm://spark_e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """select DM_INT,DM_VARCHAR from "SYSDBA".e2e_table_source"""
+    partition_column = "DM_INT"
+  }
+
+}
+
+transform {
+
+}
+
+sink {
+  Jdbc {
+    url = "jdbc:dm://spark_e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """
+          INSERT INTO "SYSDBA".e2e_table_sink(DM_INT,DM_VARCHAR)
+          values (?,?)
+"""
+  }
+}
+
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 96d8bbb04..a7a084bac 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -55,6 +55,7 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml
index aecd54f82..6e1b2aec5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/pom.xml
@@ -49,6 +49,7 @@
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
+            <version>${postgresql.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/seatunnel-server/seatunnel-app/pom.xml b/seatunnel-server/seatunnel-app/pom.xml
index d485d9f10..6acf7ead6 100644
--- a/seatunnel-server/seatunnel-app/pom.xml
+++ b/seatunnel-server/seatunnel-app/pom.xml
@@ -137,6 +137,7 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
             <scope>provided</scope>
         </dependency>