You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/09/30 13:24:04 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] add sqlserver connector (#2646)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 05d105dea [Feature][Connector-V2] add sqlserver connector (#2646)
05d105dea is described below
commit 05d105dea3ee53b9d001adbfcf2ec87ea06f8aba
Author: liugddx <80...@qq.com>
AuthorDate: Fri Sep 30 21:23:56 2022 +0800
[Feature][Connector-V2] add sqlserver connector (#2646)
* refactor sqlserver connector
* merge dev and fix some conflict
* add sqlserver doc
* [Feature][Connector-V2] add sqlserver jdbc driver into container.
* [Feature][Connector-V2] fix some bug
* Update docs/en/connector-v2/sink/Jdbc.md
Co-authored-by: Hisoka <fa...@qq.com>
* [Feature][Connector-V2]jdbc-sqlserver change seatunnel home path
* fix some merge error
* fix some merge error
* fix `log has private access`
* revert
* fix some review problem
Co-authored-by: Hisoka <fa...@qq.com>
---
docs/en/connector-v2/sink/Jdbc.md | 18 +--
docs/en/connector-v2/source/Jdbc.md | 11 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 13 +-
.../dialect/sqlserver/SqlServerDialect.java | 39 +++++
.../dialect/sqlserver/SqlServerDialectFactory.java | 40 +++++
.../sqlserver/SqlserverJdbcRowConverter.java | 39 +++++
.../dialect/sqlserver/SqlserverTypeMapper.java | 132 +++++++++++++++
.../connector-jdbc-flink-e2e/pom.xml | 11 ++
.../e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 178 +++++++++++++++++++++
.../resources/container-license-acceptance.txt | 1 +
.../jdbc/jdbc_sqlserver_source_to_sink.conf | 60 +++++++
.../connector-jdbc-spark-e2e/pom.xml | 10 ++
.../e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 178 +++++++++++++++++++++
.../resources/container-license-acceptance.txt | 1 +
.../jdbc/jdbc_sqlserver_source_to_sink.conf | 62 +++++++
15 files changed, 778 insertions(+), 15 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 6ae2b7a6a..5d8385f21 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -4,7 +4,7 @@
## Description
-Write data through jdb c. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
+Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
semantics (using XA transaction guarantee).
## Key features
@@ -37,7 +37,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
### driver [string]
The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
-Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to
+Warn: for license compliance, you have to provide any driver yourself like MySQL JDBC Driver, e.g. copy mysql-connector-java-xxx.jar to
$SEATNUNNEL_HOME/lib for Standalone.
### user [string]
@@ -102,13 +102,13 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
## appendix
there are some reference value for params above.
-
-| datasource | driver | url | xa_data_source_class_name | maven |
-|------------|----------------------------------------------|-----------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------------------|
-| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
-| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
-| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
+| datasource | driver | url | xa_data_source_class_name | maven |
+|------------|----------------------------------------------|-----------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------|
+| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
+| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
+| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
+| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 9e798e96c..a026e378f 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -79,12 +79,13 @@ in parallel according to the concurrency of tasks.
there are some reference value for params above.
-| datasource | driver | url | maven |
-|------------|----------------------------------------------|--------------------------------------------------------------------|---------------------------------------------------------------------------------------|
-| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
-| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
+| datasource | driver | url | maven |
+|------------|--------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
+| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
+| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
+| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
## Example
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index d61865a19..25a945117 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -33,6 +33,7 @@
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.3.3</postgresql.version>
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
+ <sqlserver.version>9.2.1.jre8</sqlserver.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
</properties>
@@ -62,6 +63,12 @@
<version>${dm-jdbc.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <version>${sqlserver.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -70,7 +77,6 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
</dependency>
<dependency>
@@ -85,6 +91,11 @@
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
new file mode 100644
index 000000000..81c265939
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class SqlServerDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "Sqlserver";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new SqlserverJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new SqlserverTypeMapper();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
new file mode 100644
index 000000000..050082b7c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link SqlServerDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class SqlServerDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:sqlserver:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new SqlServerDialect();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
new file mode 100644
index 000000000..ed8d9db71
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class SqlserverJdbcRowConverter extends AbstractJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return "Sqlserver";
+ }
+
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+ return super.toInternal(rs, metaData, typeInfo);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
new file mode 100644
index 000000000..d7a11563f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqlserverTypeMapper implements JdbcDialectTypeMapper {
+
+
+ // ============================data types=====================
+
+ private static final String SQLSERVER_UNKNOWN = "UNKNOWN";
+
+ // -------------------------number----------------------------
+ private static final String SQLSERVER_BIT = "BIT";
+ private static final String SQLSERVER_TINYINT = "TINYINT";
+ private static final String SQLSERVER_SMALLINT = "SMALLINT";
+ private static final String SQLSERVER_INTEGER = "INTEGER";
+ private static final String SQLSERVER_INT = "INT";
+ private static final String SQLSERVER_BIGINT = "BIGINT";
+ private static final String SQLSERVER_DECIMAL = "DECIMAL";
+ private static final String SQLSERVER_FLOAT = "FLOAT";
+ private static final String SQLSERVER_REAL = "REAL";
+ private static final String SQLSERVER_NUMERIC = "NUMERIC";
+ private static final String SQLSERVER_MONEY = "MONEY";
+ private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
+ // -------------------------string----------------------------
+ private static final String SQLSERVER_CHAR = "CHAR";
+ private static final String SQLSERVER_VARCHAR = "VARCHAR";
+ private static final String SQLSERVER_NTEXT = "NTEXT";
+ private static final String SQLSERVER_NCHAR = "NCHAR";
+ private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
+ private static final String SQLSERVER_TEXT = "TEXT";
+
+ // ------------------------------time-------------------------
+ private static final String SQLSERVER_DATE = "DATE";
+ private static final String SQLSERVER_TIME = "TIME";
+ private static final String SQLSERVER_DATETIME = "DATETIME";
+ private static final String SQLSERVER_DATETIME2 = "DATETIME2";
+ private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";
+ private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
+ private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";
+
+ // ------------------------------blob-------------------------
+ private static final String SQLSERVER_BINARY = "BINARY";
+ private static final String SQLSERVER_VARBINARY = "VARBINARY";
+ private static final String SQLSERVER_IMAGE = "IMAGE";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+ String sqlServerType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ switch (sqlServerType) {
+ case SQLSERVER_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case SQLSERVER_TINYINT:
+ case SQLSERVER_SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case SQLSERVER_INTEGER:
+ case SQLSERVER_INT:
+ return BasicType.INT_TYPE;
+ case SQLSERVER_BIGINT:
+ return BasicType.LONG_TYPE;
+ case SQLSERVER_DECIMAL:
+ case SQLSERVER_NUMERIC:
+ case SQLSERVER_MONEY:
+ case SQLSERVER_SMALLMONEY:
+ return new DecimalType(precision, scale);
+ case SQLSERVER_REAL:
+ return BasicType.FLOAT_TYPE;
+ case SQLSERVER_FLOAT:
+ return BasicType.DOUBLE_TYPE;
+ case SQLSERVER_CHAR:
+ case SQLSERVER_NCHAR:
+ case SQLSERVER_VARCHAR:
+ case SQLSERVER_NTEXT:
+ case SQLSERVER_NVARCHAR:
+ case SQLSERVER_TEXT:
+ return BasicType.STRING_TYPE;
+ case SQLSERVER_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case SQLSERVER_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case SQLSERVER_DATETIME:
+ case SQLSERVER_DATETIME2:
+ case SQLSERVER_TIMESTAMP:
+ case SQLSERVER_SMALLDATETIME:
+ case SQLSERVER_DATETIMEOFFSET:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case SQLSERVER_BINARY:
+ case SQLSERVER_VARBINARY:
+ case SQLSERVER_IMAGE:
+ return PrimitiveByteArrayType.INSTANCE;
+ //Doesn't support yet
+ case SQLSERVER_UNKNOWN:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support SQLSERVER type '%s' on column '%s' yet.",
+ sqlServerType, jdbcColumnName));
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index d48da1f48..f6220d9b0 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -97,6 +97,17 @@
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mssqlserver</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 000000000..85360a275
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends FlinkContainer {
+
+ private MSSQLServerContainer<?> mssqlServerContainer;
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startSqlserverContainer() throws ClassNotFoundException, SQLException {
+ mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("sqlserver")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+ log.info("Sqlserver container started");
+ Class.forName(mssqlServerContainer.getDriverClassName());
+ Awaitility.given().ignoreExceptions()
+ .await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(this::initializeJdbcTable);
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sourceSql = "CREATE TABLE [source] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ String sinkSql = "CREATE TABLE [sink] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ statement.execute(sourceSql);
+ statement.execute(sinkSql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Sqlserver table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ private void batchInsertData() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ String sql =
+ "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+ "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sourceSql = "select * from source";
+ String sinkSql = "select * from sink";
+ List<String> columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage");
+
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columns) {
+ Object source = sourceResultSet.getObject(column);
+ int sourceIndex = sourceResultSet.findColumn(column);
+ int sinkIndex = sinkResultSet.findColumn(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex);
+ InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex);
+ String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
+ }
+ }
+ }
+ }
+ }
+
+ @AfterEach
+ public void closeSqlserverContainer() {
+ if (mssqlServerContainer != null) {
+ mssqlServerContainer.stop();
+ }
+ }
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 000000000..7f099b0aa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 000000000..52f18f1e0
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index 7d16b9182..ee7629f14 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -86,6 +86,16 @@
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mssqlserver</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 000000000..053ea0b37
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends SparkContainer {
+
+ private MSSQLServerContainer<?> mssqlServerContainer;
+ private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startSqlServerContainer() throws ClassNotFoundException, SQLException {
+ mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("sqlserver")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+ log.info("Sqlserver container started");
+ Class.forName(mssqlServerContainer.getDriverClassName());
+ Awaitility.given().ignoreExceptions()
+ .await()
+ .atMost(Duration.ofMinutes(3))
+ .untilAsserted(this::initializeJdbcTable);
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sourceSql = "CREATE TABLE [source] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ String sinkSql = "CREATE TABLE [sink] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ statement.execute(sourceSql);
+ statement.execute(sinkSql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Sqlserver table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ private void batchInsertData() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ String sql =
+ "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+ "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sourceSql = "select * from source";
+ String sinkSql = "select * from sink";
+ List<String> columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage");
+
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columns) {
+ Object source = sourceResultSet.getObject(column);
+ int sourceIndex = sourceResultSet.findColumn(column);
+ int sinkIndex = sinkResultSet.findColumn(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex);
+ InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex);
+ String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
+ }
+ }
+ }
+ }
+ }
+
+ @AfterEach
+ public void closeSqlserverContainer() {
+ if (mssqlServerContainer != null) {
+ mssqlServerContainer.stop();
+ }
+ }
+
+ @Override
+ protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 000000000..7f099b0aa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 000000000..98d8451de
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}