You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/08/09 08:19:09 UTC

[flink] branch master updated: [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 61374638c02 [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect
61374638c02 is described below

commit 61374638c02e5f996cad678253c99363f7ae01a5
Author: Luning (Lucas) Wang <wa...@gmail.com>
AuthorDate: Tue Aug 9 16:18:59 2022 +0800

    [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect
    
    This closes #18469.
---
 .../flink/connector/jdbc/dialect/JdbcDialect.java  | 12 +++++++
 .../connector/jdbc/dialect/mysql/MySqlDialect.java | 18 ++++++++++
 .../internal/options/JdbcConnectorOptions.java     |  2 +-
 .../jdbc/dialect/mysql/MySqlDialectTest.java       | 41 ++++++++++++++++++++++
 4 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
index 6d4c1db91b4..6cc6bbd57d1 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
@@ -142,4 +142,16 @@ public interface JdbcDialect extends Serializable {
      */
     String getSelectFromStatement(
             String tableName, String[] selectFields, String[] conditionFields);
+
+    /**
+     * Appends default JDBC properties to url for current dialect. Some database dialects will set
+     * default JDBC properties for performance or optimization consideration, such as MySQL dialect
+     * uses 'rewriteBatchedStatements=true' to enable execute multiple MySQL statements in batch
+     * mode.
+     *
+     * @return A JDBC url that has appended the default properties.
+     */
+    default String appendDefaultUrlProperties(String url) {
+        return url;
+    }
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
index 918af883685..b44155913aa 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
@@ -47,6 +47,10 @@ public class MySqlDialect extends AbstractDialect {
     private static final int MAX_DECIMAL_PRECISION = 65;
     private static final int MIN_DECIMAL_PRECISION = 1;
 
+    // The JDBC option to enable execute multiple MySQL statements in batch mode:
+    // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements
+    private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements";
+
     @Override
     public JdbcRowConverter getRowConverter(RowType rowType) {
         return new MySQLRowConverter(rowType);
@@ -126,4 +130,18 @@ public class MySqlDialect extends AbstractDialect {
                 LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
                 LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
     }
+
+    @Override
+    public String appendDefaultUrlProperties(String url) {
+        if (!url.contains(REWRITE_BATCHED_STATEMENTS)) {
+            String defaultUrlProperties = REWRITE_BATCHED_STATEMENTS + "=true";
+            if (url.contains("?")) {
+                return url + "&" + defaultUrlProperties;
+            } else {
+                return url + "?" + defaultUrlProperties;
+            }
+        } else {
+            return url;
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
index 6158475a6aa..89d22bd3067 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
@@ -181,7 +181,7 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions {
             }
 
             return new JdbcConnectorOptions(
-                    dbURL,
+                    dialect.appendDefaultUrlProperties(dbURL),
                     tableName,
                     driverName,
                     username,
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
new file mode 100644
index 00000000000..006cc7685c6
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MySqlDialect}. */
+public class MySqlDialectTest {
+
+    @Test
+    void testAppendDefaultUrlProperties() {
+        MySqlDialect dialect = new MySqlDialect();
+        assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo"))
+                .isEqualTo("jdbc:mysql://localhost:3306/foo?rewriteBatchedStatements=true");
+        assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo?foo=bar"))
+                .isEqualTo("jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=true");
+        assertThat(
+                        dialect.appendDefaultUrlProperties(
+                                "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false"))
+                .isEqualTo(
+                        "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false");
+    }
+}