You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/10 08:37:39 UTC

[GitHub] [flink] qingwei91 opened a new pull request, #20235: Jdbc dialects

qingwei91 opened a new pull request, #20235:
URL: https://github.com/apache/flink/pull/20235

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1226841548

   @JingGe sorry I misunderstood, I've fixed it now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1239055514

   Merge on master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: Jdbc dialects

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1179684209

   This PR is a bit too big for my taste, I've tried to split the commits by logical grouping, hope that helps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1223783098

   Hi @qingwei91 It would be great if you could squash the three commits into one. 
   
   The PR title just missed "-".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r936919123


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   Hi @maver1ck , I believe TOP syntax needs to be added like this:
   
   SELECT TOP N FROM TABLE, this does not play well with current pattern where LIMIT is added to the end of the statement.
   
   Maybe we should rethink this method api for it to be more generic? Is it ok to not include it in this PR? I am happy to take on this as a separate issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: Jdbc dialects

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r917362437


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   Question to maintainer:
   
   Is it ok to not support limit for SQL Server?
   The reason is that SQL Server does not support limit natively, it supports by OFFSET syntax, but that syntax requires ORDER BY to be valid, with the current structure, I don't think we can support it properly.
   
   This method has no contextual knowledge of whether ORDER BY exists in the larger statement or not.
   
   If you think this is not acceptable, we might need to change code in `JdbcDynamicTableSource`, to support this special case



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/SqlServerRowConverter.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.internal.converter;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * MsSql.
+ */
+public class SqlServerRowConverter extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String converterName() {
+        return "SqlServer";
+    }
+
+    public SqlServerRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    @Override
+    protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case TINYINT:
+                return val -> ((Short) val).byteValue();

Review Comment:
   TINY INT in SQL Server is Short, instead of Integer, thus we override here



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSourceITCase extends AbstractTestBase {
+
+    private static final MSSQLServerContainer container =
+            new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
+                    .acceptLicense();
+    private static String containerUrl;
+    private static final String INPUT_TABLE = "sql_test_table";
+
+    private static StreamExecutionEnvironment env;
+    private static TableEnvironment tEnv;
+
+    @BeforeClass
+    public static void beforeAll() throws ClassNotFoundException, SQLException {
+        container.start();
+        containerUrl = container.getJdbcUrl();
+        Class.forName(container.getDriverClassName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                containerUrl, container.getUsername(), container.getPassword());
+                Statement statement = conn.createStatement()) {
+            statement.executeUpdate(
+                    "CREATE TABLE "
+                            + INPUT_TABLE
+                            + " ("
+                            + "id INT NOT NULL,"
+                            + "tiny_int TINYINT,"
+                            + "small_int SMALLINT,"
+                            + "big_int BIGINT,"
+                            + "float_col REAL,"
+                            + "double_col FLOAT ,"
+                            + "decimal_col DECIMAL(10, 4) NOT NULL,"
+                            + "bool BIT NOT NULL,"
+                            + "date_col DATE NOT NULL,"
+                            + "time_col TIME(5) NOT NULL,"
+                            + "datetime_col DATETIME,"
+                            + "datetime2_col DATETIME2,"
+                            + "char_col CHAR NOT NULL,"
+                            + "nchar_col NCHAR(3) NOT NULL,"
+                            + "varchar2_col VARCHAR(30) NOT NULL,"
+                            + "nvarchar2_col NVARCHAR(30) NOT NULL,"
+                            + "text_col TEXT,"
+                            + "ntext_col NTEXT,"
+                            + "binary_col BINARY(10)"
+                            + ")");

Review Comment:
   I've covered all types claimed to be supported in the doc



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/** The Table Sink ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSinkITCase extends AbstractTestBase {

Review Comment:
   This test is adapted from OracleTableSinkITCase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1233231023

   Hi @JingGe thanks for the approval, what else do we need to be able to merge this? Should I reach out to other contributor/committer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1186440169

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1233254758

   Since we are currently in feature freeze, this can only be merged when the release branch is cut in the next week or so


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] maver1ck commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
maver1ck commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r936621194


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   @qingwei91 Can we use `TOP` syntax ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r924890763


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.Test;

Review Comment:
   I've changed it here: https://github.com/apache/flink/pull/20235/commits/32f67750e5ffc7b9192d4e03fcaacac8a2f7aff2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1215758784

   Hi @JingGe , do we just need 1 more review on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1193085665

   Hi @RocMarshal will you be able to review this again? I've addressed your comment. 
   
   Thanks for your help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1236838113

   @MartijnVisser 1.16 branch has been cut today. This PR could be merged now. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r922667181


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   IMO, +1 for ```not support in this PR now```.
   I prefer to split this feature into a new PR. 
   This should not be only related to dialect, but also need to change or introduce a new `pushdownXXX` in the relevant pushdown mechanism.
   cc @bowenli86  @JingGe 
   



##########
flink-connectors/flink-connector-jdbc/pom.xml:
##########
@@ -68,6 +68,14 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- SQL Server -->
+		<dependency>
+			<groupId>com.microsoft.sqlserver</groupId>
+			<artifactId>mssql-jdbc</artifactId>
+			<version>10.2.0.jre8</version>

Review Comment:
   Could the version be the latest version ?



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.Test;

Review Comment:
   The test-suites in use is ```jupiter``` now.
   pls migrate it to the ```jupiter```.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1185740486

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1183518674

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r928283816


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSourceITCase extends AbstractTestBase {

Review Comment:
   This test case depends on `AbstractTestBase`, AbstractTestBase is still depending on Junit 4, so I guess we cannot apply the migration to this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1217195964

   @qingwei91 would you like to do the following clean-up:
   1. squash commits and please make sure the commit message follows Flink rule.
   2. rebase to the master.
   3. Replace "[Flink 14101]" with "[Flink-14101]" in the PR title.
   
   Thanks for your effort!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1184266882

   @RocMarshal since you are working on FLIP-239, would you like to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1199075388

   @fpompermaier Do you also want to have another look at this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1239030600

   Thanks @qingwei91 for the PR and everyone for the reviewing and tracking! I could help merging the PR~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r928255614


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSourceITCase extends AbstractTestBase {

Review Comment:
   would the ```public``` in the class be removed ? 



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/** The Table Sink ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSinkITCase extends AbstractTestBase {

Review Comment:
   We also need to consider whether ```public``` would be removed



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return

Review Comment:
   supplement this return comments or remove the lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r928235749


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SqlServerPreparedStatementTest}. */
+public class SqlServerPreparedStatementTest {

Review Comment:
   Hi @RocMarshal , I've addressed this comment here: https://github.com/apache/flink/pull/20235/commits/548361b838146b705cd8c54b98d42b8e1cec3481
   
   Thanks for the tip!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1221245897

   Hi @JingGe thanks for the feedback, I've done the 1st and 2nd point.
   
   But I don't know how to execute the 3rd point, I don't think it supports Markdown in PR title, maybe I am missing something


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1184422442

   Hi JingGe, sure, I will give it a go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r922766254


##########
flink-connectors/flink-connector-jdbc/pom.xml:
##########
@@ -68,6 +68,14 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- SQL Server -->
+		<dependency>
+			<groupId>com.microsoft.sqlserver</groupId>
+			<artifactId>mssql-jdbc</artifactId>
+			<version>10.2.0.jre8</version>

Review Comment:
   Okay~.  Just keep the current version well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1239017424

   @PatrickRen Would you like to merge this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r939799891


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/** The Table Sink ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSinkITCase extends AbstractTestBase {
+
+    private static final MSSQLServerContainer container =
+            new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
+                    .acceptLicense();
+    private static String containerUrl;
+
+    public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
+    public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
+    public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch";
+    public static final String OUTPUT_TABLE4 = "REAL_TABLE";
+    public static final String OUTPUT_TABLE5 = "checkpointTable";
+    public static final String USER_TABLE = "USER_TABLE";
+
+    @BeforeClass
+    public static void beforeAll() throws ClassNotFoundException, SQLException {
+        container.start();
+        containerUrl =
+                String.format(
+                        "%s;username=%s;password=%s",
+                        container.getJdbcUrl(), container.getUsername(), container.getPassword());
+        Class.forName(container.getDriverClassName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                containerUrl, container.getUsername(), container.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.executeUpdate(
+                    "CREATE TABLE "
+                            + OUTPUT_TABLE1
+                            + " ("
+                            + "cnt FLOAT DEFAULT 0 NOT NULL,"
+                            + "lencnt FLOAT DEFAULT 0 NOT NULL,"
+                            + "cTag INT DEFAULT 0 NOT NULL,"
+                            + "ts DATETIME2,"
+                            + "CONSTRAINT PK1 PRIMARY KEY CLUSTERED (cnt, cTag))");
+
+            stat.executeUpdate(
+                    "CREATE TABLE "
+                            + OUTPUT_TABLE2
+                            + " ("
+                            + "id INT DEFAULT 0 NOT NULL,"
+                            + "num INT DEFAULT 0 NOT NULL,"
+                            + "ts DATETIME2)");
+
+            stat.executeUpdate(
+                    "CREATE TABLE "
+                            + OUTPUT_TABLE3
+                            + " ("
+                            + "NAME VARCHAR(20) NOT NULL,"
+                            + "SCORE INT DEFAULT 0 NOT NULL)");
+
+            stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE4 + " (real_data REAL)");
+
+            stat.executeUpdate(
+                    "CREATE TABLE " + OUTPUT_TABLE5 + " (" + "id BIGINT DEFAULT 0 NOT NULL)");
+
+            stat.executeUpdate(
+                    "CREATE TABLE "
+                            + USER_TABLE
+                            + " ("
+                            + "user_id VARCHAR(20) NOT NULL,"
+                            + "user_name VARCHAR(20) NOT NULL,"
+                            + "email VARCHAR(255),"
+                            + "balance DECIMAL(18,2),"
+                            + "balance2 DECIMAL(18,2),"
+                            + "CONSTRAINT PK2 PRIMARY KEY CLUSTERED (user_id))");
+        }
+    }
+
+    @AfterClass
+    public static void afterAll() throws Exception {
+        TestValuesTableFactory.clearAllData();
+        Class.forName(container.getDriverClassName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                containerUrl, container.getUsername(), container.getPassword());
+                Statement stat = conn.createStatement()) {
+            stat.execute("DROP TABLE " + OUTPUT_TABLE1);
+            stat.execute("DROP TABLE " + OUTPUT_TABLE2);
+            stat.execute("DROP TABLE " + OUTPUT_TABLE3);
+            stat.execute("DROP TABLE " + OUTPUT_TABLE4);
+            stat.execute("DROP TABLE " + OUTPUT_TABLE5);
+            stat.execute("DROP TABLE " + USER_TABLE);
+        }
+        container.stop();
+    }
+
+    public static DataStream<Tuple4<Integer, Long, String, Timestamp>> get4TupleDataStream(
+            StreamExecutionEnvironment env) {
+        List<Tuple4<Integer, Long, String, Timestamp>> data = new ArrayList<>();
+        data.add(new Tuple4<>(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")));
+        data.add(new Tuple4<>(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")));
+        data.add(new Tuple4<>(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")));
+        data.add(
+                new Tuple4<>(
+                        4,
+                        3L,
+                        "Hello world, how are you?",
+                        Timestamp.valueOf("1970-01-01 00:00:00.004")));
+        data.add(new Tuple4<>(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")));
+        data.add(
+                new Tuple4<>(
+                        6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")));
+        data.add(new Tuple4<>(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")));
+        data.add(new Tuple4<>(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")));
+        data.add(new Tuple4<>(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")));
+        data.add(new Tuple4<>(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")));
+        data.add(new Tuple4<>(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")));
+        data.add(new Tuple4<>(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")));
+        data.add(new Tuple4<>(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")));
+        data.add(new Tuple4<>(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")));
+        data.add(new Tuple4<>(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")));
+        data.add(new Tuple4<>(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")));
+        data.add(new Tuple4<>(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")));
+        data.add(new Tuple4<>(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")));
+        data.add(new Tuple4<>(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")));
+        data.add(new Tuple4<>(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")));
+        data.add(new Tuple4<>(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021")));
+
+        Collections.shuffle(data);
+        return env.fromCollection(data);
+    }
+
+    @Test
+    public void testReal() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode());
+
+        tEnv.executeSql(
+                "CREATE TABLE upsertSink ("
+                        + "  real_data float"
+                        + ") WITH ("
+                        + "  'connector'='jdbc',"
+                        + "  'url'='"
+                        + containerUrl
+                        + "',"
+                        + "  'table-name'='"
+                        + OUTPUT_TABLE4
+                        + "',"
+                        + "  'username'='"
+                        + container.getUsername()
+                        + "',"
+                        + "  'password'='"
+                        + container.getPassword()
+                        + "'"
+                        + ")");
+
+        tEnv.executeSql("INSERT INTO upsertSink SELECT CAST(1.1 as FLOAT)").await();
+        check(new Row[] {Row.of(1.1f)}, containerUrl, "REAL_TABLE", new String[] {"real_data"});
+    }
+
+    @Test
+    public void testUpsert() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        Table t =
+                tEnv.fromDataStream(

Review Comment:
   Might we change `fromDataStream` and `assignTimestampsAndWatermarks` to the non-deprecated versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20235: Jdbc dialects

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1179684697

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f02bdfb5e081446be04726b0dc703f554e0e934",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f02bdfb5e081446be04726b0dc703f554e0e934",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f02bdfb5e081446be04726b0dc703f554e0e934 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r928109601


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SqlServerPreparedStatementTest}. */
+public class SqlServerPreparedStatementTest {

Review Comment:
   hi, @qingwei91 thx for the update.
   
   We should remove the scope keyword ```public``` on class definition & method declaration in test classes based on the [items](https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit) from 
   https://issues.apache.org/jira/browse/FLINK-25325



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] maver1ck commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
maver1ck commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r936922685


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   I agree. Probably we need to refactor getLimit part.
   Let's make it separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: Jdbc dialects

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1180172085

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r922704584


##########
flink-connectors/flink-connector-jdbc/pom.xml:
##########
@@ -68,6 +68,14 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- SQL Server -->
+		<dependency>
+			<groupId>com.microsoft.sqlserver</groupId>
+			<artifactId>mssql-jdbc</artifactId>
+			<version>10.2.0.jre8</version>

Review Comment:
   Hi, @RocMarshal , are you referring to v11.x.x? From my understanding, there are still at preview release, at 10.2.x is the latest stable release. SHould we use v11?
   
   https://github.com/microsoft/mssql-jdbc/releases/tag/v.11.1.0
   
   I've bumped it to 10.2.1 though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1185144498

   > @RocMarshal since you are working on FLIP-239, would you like to review this PR?
   
   Thanks @qingwei91 for the pr and @JingGe for the ping. I'll check it asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] qingwei91 commented on pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #20235:
URL: https://github.com/apache/flink/pull/20235#issuecomment-1184088857

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii closed pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #20235: [Flink-14101][Connectors][Jdbc] SQL Server dialect
URL: https://github.com/apache/flink/pull/20235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] JingGe commented on a diff in pull request #20235: [Flink 14101][Connectors][Jdbc] SQL Server dialect

Posted by GitBox <gi...@apache.org>.
JingGe commented on code in PR #20235:
URL: https://github.com/apache/flink/pull/20235#discussion_r947270277


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+    @Override
+    public String dialectName() {
+        return "SqlServer";
+    }
+
+    /**
+     * The maximum precision is supported by datetime2.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+     */
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(0, 7));
+    }
+
+    /**
+     * The maximum precision is supported by decimal.
+     * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+     *
+     * @return
+     */
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(0, 38));
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        List<String> nonUniqueKeyFields =
+                Arrays.stream(fieldNames)
+                        .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+                        .collect(Collectors.toList());
+        String fieldsProjection =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+
+        String valuesBinding =
+                Arrays.stream(fieldNames)
+                        .map(f -> ":" + f + " " + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String usingClause = String.format("SELECT %s", valuesBinding);
+        String onConditions =
+                Arrays.stream(uniqueKeyFields)
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(" AND "));
+        String updateSetClause =
+                nonUniqueKeyFields.stream()
+                        .map(
+                                f ->
+                                        "[TARGET]."
+                                                + quoteIdentifier(f)
+                                                + "=[SOURCE]."
+                                                + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+
+        String insertValues =
+                Arrays.stream(fieldNames)
+                        .map(f -> "[SOURCE]." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                String.format(
+                        "MERGE INTO %s AS [TARGET]"
+                                + " USING (%s) AS [SOURCE]"
+                                + " ON (%s)"
+                                + " WHEN MATCHED THEN"
+                                + " UPDATE SET %s"
+                                + " WHEN NOT MATCHED THEN"
+                                + " INSERT (%s) VALUES (%s);",
+                        quoteIdentifier(tableName),
+                        usingClause,
+                        onConditions,
+                        updateSetClause,
+                        fieldsProjection,
+                        insertValues));
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new SqlServerRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");

Review Comment:
   new ticket has been created: FLINK-29003



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org