You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/09/30 13:24:04 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] add sqlserver connector (#2646)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 05d105dea [Feature][Connector-V2] add sqlserver connector (#2646)
05d105dea is described below

commit 05d105dea3ee53b9d001adbfcf2ec87ea06f8aba
Author: liugddx <80...@qq.com>
AuthorDate: Fri Sep 30 21:23:56 2022 +0800

    [Feature][Connector-V2] add sqlserver connector (#2646)
    
    * refactor sqlserver connector
    
    * merge dev and fix some conflict
    
    * add sqlserver doc
    
    * [Feature][Connector-V2] add sqlserver jdbc driver into container.
    
    * [Feature][Connector-V2] fix some bug
    
    * Update docs/en/connector-v2/sink/Jdbc.md
    
    Co-authored-by: Hisoka <fa...@qq.com>
    
    * [Feature][Connector-V2]jdbc-sqlserver change seatunnel home path
    
    * fix some merge error
    
    * fix some merge error
    
    * fix `log has private access`
    
    * revert
    
    * fix some review problem
    
    Co-authored-by: Hisoka <fa...@qq.com>
---
 docs/en/connector-v2/sink/Jdbc.md                  |  18 +--
 docs/en/connector-v2/source/Jdbc.md                |  11 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  13 +-
 .../dialect/sqlserver/SqlServerDialect.java        |  39 +++++
 .../dialect/sqlserver/SqlServerDialectFactory.java |  40 +++++
 .../sqlserver/SqlserverJdbcRowConverter.java       |  39 +++++
 .../dialect/sqlserver/SqlserverTypeMapper.java     | 132 +++++++++++++++
 .../connector-jdbc-flink-e2e/pom.xml               |  11 ++
 .../e2e/flink/v2/jdbc/JdbcSqlserverIT.java         | 178 +++++++++++++++++++++
 .../resources/container-license-acceptance.txt     |   1 +
 .../jdbc/jdbc_sqlserver_source_to_sink.conf        |  60 +++++++
 .../connector-jdbc-spark-e2e/pom.xml               |  10 ++
 .../e2e/spark/v2/jdbc/JdbcSqlserverIT.java         | 178 +++++++++++++++++++++
 .../resources/container-license-acceptance.txt     |   1 +
 .../jdbc/jdbc_sqlserver_source_to_sink.conf        |  62 +++++++
 15 files changed, 778 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 6ae2b7a6a..5d8385f21 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -4,7 +4,7 @@
 
 ## Description
 
-Write data through jdb c. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
+Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
 semantics (using XA transaction guarantee).
 
 ## Key features
@@ -37,7 +37,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
 ### driver [string]
 
 The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
-Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to
+Warn: for license compliance, you have to provide any driver yourself like MySQL JDBC Driver, e.g. copy mysql-connector-java-xxx.jar to
 $SEATNUNNEL_HOME/lib for Standalone.
 
 ### user [string]
@@ -102,13 +102,13 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
 ## appendix
 
 there are some reference value for params above.
-
-| datasource | driver                                       | url                                                                   | xa_data_source_class_name           | maven                                                                                 |
-|------------|----------------------------------------------|-----------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------------------|
-| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                      | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
-| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                             | org.postgresql.xa.PGXADataSource    | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |                                                             |
-| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                              | dm.jdbc.driver.DmdbXADataSource     | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF    | /                                   | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client  |
+| datasource | driver                                       | url                                                                   | xa_data_source_class_name                              | maven                                                                                 |
+|------------|----------------------------------------------|-----------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                      | com.mysql.cj.jdbc.MysqlXADataSource                    | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
+| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                             | org.postgresql.xa.PGXADataSource                       | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |                                                             |
+| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                              | dm.jdbc.driver.DmdbXADataSource                        | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF    | /                                                      | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client  |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                             | com.microsoft.sqlserver.jdbc.SQLServerXADataSource     | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                 |
 
 ## Example
 
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 9e798e96c..a026e378f 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -79,12 +79,13 @@ in parallel according to the concurrency of tasks.
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url                                                                | maven                                                                                 |
-|------------|----------------------------------------------|--------------------------------------------------------------------|---------------------------------------------------------------------------------------|
-| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
-| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                          | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |                                                             |
-| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                           | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
+| datasource | driver                   | url                                                                                    | maven                                                                                 |
+|------------|--------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test                                                       | https://mvnrepository.com/artifact/mysql/mysql-connector-java                         |
+| postgresql | org.postgresql.Driver    | jdbc:postgresql://localhost:5432/postgres                                              | https://mvnrepository.com/artifact/org.postgresql/postgresql                          |
+| dm         | dm.jdbc.driver.DmDriver  | jdbc:dm://localhost:5236                                                               | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                          |
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client  |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver  | jdbc:microsoft:sqlserver://localhost:1433                         | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                 |
 
 ## Example
 
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index d61865a19..25a945117 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -33,6 +33,7 @@
         <mysql.version>8.0.16</mysql.version>
         <postgresql.version>42.3.3</postgresql.version>
         <dm-jdbc.version>8.1.2.141</dm-jdbc.version>
+        <sqlserver.version>9.2.1.jre8</sqlserver.version>
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
     </properties>
 
@@ -62,6 +63,12 @@
                 <version>${dm-jdbc.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>com.microsoft.sqlserver</groupId>
+                <artifactId>mssql-jdbc</artifactId>
+                <version>${sqlserver.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -70,7 +77,6 @@
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
-            <version>${mysql.version}</version>
         </dependency>
 
         <dependency>
@@ -85,6 +91,11 @@
             <groupId>com.aliyun.phoenix</groupId>
             <artifactId>ali-phoenix-shaded-thin-client</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
new file mode 100644
index 000000000..81c265939
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class SqlServerDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Sqlserver";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new SqlserverJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new SqlserverTypeMapper();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
new file mode 100644
index 000000000..050082b7c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link SqlServerDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class SqlServerDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:sqlserver:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new SqlServerDialect();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
new file mode 100644
index 000000000..ed8d9db71
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class SqlserverJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "Sqlserver";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
new file mode 100644
index 000000000..d7a11563f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqlserverTypeMapper implements JdbcDialectTypeMapper {
+
+
+    // ============================data types=====================
+
+    private static final String SQLSERVER_UNKNOWN = "UNKNOWN";
+
+    // -------------------------number----------------------------
+    private static final String SQLSERVER_BIT = "BIT";
+    private static final String SQLSERVER_TINYINT = "TINYINT";
+    private static final String SQLSERVER_SMALLINT = "SMALLINT";
+    private static final String SQLSERVER_INTEGER = "INTEGER";
+    private static final String SQLSERVER_INT = "INT";
+    private static final String SQLSERVER_BIGINT = "BIGINT";
+    private static final String SQLSERVER_DECIMAL = "DECIMAL";
+    private static final String SQLSERVER_FLOAT = "FLOAT";
+    private static final String SQLSERVER_REAL = "REAL";
+    private static final String SQLSERVER_NUMERIC = "NUMERIC";
+    private static final String SQLSERVER_MONEY = "MONEY";
+    private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
+    // -------------------------string----------------------------
+    private static final String SQLSERVER_CHAR = "CHAR";
+    private static final String SQLSERVER_VARCHAR = "VARCHAR";
+    private static final String SQLSERVER_NTEXT = "NTEXT";
+    private static final String SQLSERVER_NCHAR = "NCHAR";
+    private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
+    private static final String SQLSERVER_TEXT = "TEXT";
+
+    // ------------------------------time-------------------------
+    private static final String SQLSERVER_DATE = "DATE";
+    private static final String SQLSERVER_TIME = "TIME";
+    private static final String SQLSERVER_DATETIME = "DATETIME";
+    private static final String SQLSERVER_DATETIME2 = "DATETIME2";
+    private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";
+    private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
+    private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";
+
+    // ------------------------------blob-------------------------
+    private static final String SQLSERVER_BINARY = "BINARY";
+    private static final String SQLSERVER_VARBINARY = "VARBINARY";
+    private static final String SQLSERVER_IMAGE = "IMAGE";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String sqlServerType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+        switch (sqlServerType) {
+            case SQLSERVER_BIT:
+                return BasicType.BOOLEAN_TYPE;
+            case SQLSERVER_TINYINT:
+            case SQLSERVER_SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case SQLSERVER_INTEGER:
+            case SQLSERVER_INT:
+                return BasicType.INT_TYPE;
+            case SQLSERVER_BIGINT:
+                return BasicType.LONG_TYPE;
+            case SQLSERVER_DECIMAL:
+            case SQLSERVER_NUMERIC:
+            case SQLSERVER_MONEY:
+            case SQLSERVER_SMALLMONEY:
+                return new DecimalType(precision, scale);
+            case SQLSERVER_REAL:
+                return BasicType.FLOAT_TYPE;
+            case SQLSERVER_FLOAT:
+                return BasicType.DOUBLE_TYPE;
+            case SQLSERVER_CHAR:
+            case SQLSERVER_NCHAR:
+            case SQLSERVER_VARCHAR:
+            case SQLSERVER_NTEXT:
+            case SQLSERVER_NVARCHAR:
+            case SQLSERVER_TEXT:
+                return BasicType.STRING_TYPE;
+            case SQLSERVER_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case SQLSERVER_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case SQLSERVER_DATETIME:
+            case SQLSERVER_DATETIME2:
+            case SQLSERVER_TIMESTAMP:
+            case SQLSERVER_SMALLDATETIME:
+            case SQLSERVER_DATETIMEOFFSET:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case SQLSERVER_BINARY:
+            case SQLSERVER_VARBINARY:
+            case SQLSERVER_IMAGE:
+                return PrimitiveByteArrayType.INSTANCE;
+            //Doesn't support yet
+            case SQLSERVER_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support SQLSERVER type '%s' on column '%s'  yet.",
+                        sqlServerType, jdbcColumnName));
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index d48da1f48..f6220d9b0 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -97,6 +97,17 @@
             <artifactId>ali-phoenix-shaded-thin-client</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mssqlserver</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 000000000..85360a275
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends FlinkContainer {
+
+    private MSSQLServerContainer<?> mssqlServerContainer;
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startSqlserverContainer() throws ClassNotFoundException, SQLException {
+        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("sqlserver")
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+        log.info("Sqlserver container started");
+        Class.forName(mssqlServerContainer.getDriverClassName());
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(Duration.ofMinutes(1))
+            .untilAsserted(this::initializeJdbcTable);
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sourceSql = "CREATE TABLE [source] (\n" +
+                "  [ids] bigint  NOT NULL,\n" +
+                "  [name] text COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sort] int  NULL,\n" +
+                "  [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xchar] char(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xdecimal] decimal(18)  NULL,\n" +
+                "  [xfloat] float(53)  NULL,\n" +
+                "  [xnumeric] numeric(18)  NULL,\n" +
+                "  [xsmall] smallint  NULL,\n" +
+                "  [xbit] bit  NULL,\n" +
+                "  [rq] datetime DEFAULT NULL NULL,\n" +
+                "  [xrq] smalldatetime  NULL,\n" +
+                "  [xreal] real  NULL,\n" +
+                "  [ximage] image  NULL\n" +
+                ")";
+            String sinkSql = "CREATE TABLE [sink] (\n" +
+                "  [ids] bigint  NOT NULL,\n" +
+                "  [name] text COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sort] int  NULL,\n" +
+                "  [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xchar] char(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xdecimal] decimal(18)  NULL,\n" +
+                "  [xfloat] float(53)  NULL,\n" +
+                "  [xnumeric] numeric(18)  NULL,\n" +
+                "  [xsmall] smallint  NULL,\n" +
+                "  [xbit] bit  NULL,\n" +
+                "  [rq] datetime DEFAULT NULL NULL,\n" +
+                "  [xrq] smalldatetime  NULL,\n" +
+                "  [xreal] real  NULL,\n" +
+                "  [ximage] image  NULL\n" +
+                ")";
+            statement.execute(sourceSql);
+            statement.execute(sinkSql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlserver table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    private void batchInsertData() {
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            String sql =
+                "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+                    "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // query result
+        String sourceSql = "select * from source";
+        String sinkSql = "select * from sink";
+        List<String> columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage");
+
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            Statement sourceStatement = connection.createStatement();
+            Statement sinkStatement = connection.createStatement();
+            ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : columns) {
+                        Object source = sourceResultSet.getObject(column);
+                        int sourceIndex = sourceResultSet.findColumn(column);
+                        int sinkIndex = sinkResultSet.findColumn(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
+                            InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex);
+                            InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex);
+                            String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                            String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                        Assertions.assertTrue(true);
+                    }
+                }
+            }
+        }
+    }
+
+    @AfterEach
+    public void closeSqlserverContainer() {
+        if (mssqlServerContainer != null) {
+            mssqlServerContainer.stop();
+        }
+    }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 000000000..7f099b0aa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 000000000..52f18f1e0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  Jdbc {
+    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+    url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+    user = SA
+    password = "A_Str0ng_Required_Password"
+    query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Jdbc {
+    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+    url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+    user = SA
+    password = "A_Str0ng_Required_Password"
+    query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index 7d16b9182..ee7629f14 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -86,6 +86,16 @@
             <artifactId>ali-phoenix-shaded-thin-client</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mssqlserver</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 000000000..053ea0b37
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends SparkContainer {
+
+    private MSSQLServerContainer<?> mssqlServerContainer;
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @BeforeEach
+    public void startSqlServerContainer() throws ClassNotFoundException, SQLException {
+        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("sqlserver")
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+        log.info("Sqlserver container started");
+        Class.forName(mssqlServerContainer.getDriverClassName());
+        Awaitility.given().ignoreExceptions()
+            .await()
+            .atMost(Duration.ofMinutes(3))
+            .untilAsserted(this::initializeJdbcTable);
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sourceSql = "CREATE TABLE [source] (\n" +
+                "  [ids] bigint  NOT NULL,\n" +
+                "  [name] text COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sort] int  NULL,\n" +
+                "  [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xchar] char(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xdecimal] decimal(18)  NULL,\n" +
+                "  [xfloat] float(53)  NULL,\n" +
+                "  [xnumeric] numeric(18)  NULL,\n" +
+                "  [xsmall] smallint  NULL,\n" +
+                "  [xbit] bit  NULL,\n" +
+                "  [rq] datetime DEFAULT NULL NULL,\n" +
+                "  [xrq] smalldatetime  NULL,\n" +
+                "  [xreal] real  NULL,\n" +
+                "  [ximage] image  NULL\n" +
+                ")";
+            String sinkSql = "CREATE TABLE [sink] (\n" +
+                "  [ids] bigint  NOT NULL,\n" +
+                "  [name] text COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [sort] int  NULL,\n" +
+                "  [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xchar] char(255) COLLATE Chinese_PRC_CI_AS  NULL,\n" +
+                "  [xdecimal] decimal(18)  NULL,\n" +
+                "  [xfloat] float(53)  NULL,\n" +
+                "  [xnumeric] numeric(18)  NULL,\n" +
+                "  [xsmall] smallint  NULL,\n" +
+                "  [xbit] bit  NULL,\n" +
+                "  [rq] datetime DEFAULT NULL NULL,\n" +
+                "  [xrq] smalldatetime  NULL,\n" +
+                "  [xreal] real  NULL,\n" +
+                "  [ximage] image  NULL\n" +
+                ")";
+            statement.execute(sourceSql);
+            statement.execute(sinkSql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlserver table failed!", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:RegexpSingleline")
+    private void batchInsertData() {
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            String sql =
+                "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+                    "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    @Test
+    public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        // query result
+        String sourceSql = "select * from source";
+        String sinkSql = "select * from sink";
+        List<String> columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage");
+
+        try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+            Statement sourceStatement = connection.createStatement();
+            Statement sinkStatement = connection.createStatement();
+            ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : columns) {
+                        Object source = sourceResultSet.getObject(column);
+                        int sourceIndex = sourceResultSet.findColumn(column);
+                        int sinkIndex = sinkResultSet.findColumn(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
+                            InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex);
+                            InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex);
+                            String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                            String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                        Assertions.assertTrue(true);
+                    }
+                }
+            }
+        }
+    }
+
+    @AfterEach
+    public void closeSqlserverContainer() {
+        if (mssqlServerContainer != null) {
+            mssqlServerContainer.stop();
+        }
+    }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
+
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 000000000..7f099b0aa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 000000000..98d8451de
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+  Jdbc {
+    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+    url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+    user = SA
+    password = "A_Str0ng_Required_Password"
+    query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Jdbc {
+    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+    url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+    user = SA
+    password = "A_Str0ng_Required_Password"
+    query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}