You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/08/09 08:19:09 UTC
[flink] branch master updated: [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 61374638c02 [FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect
61374638c02 is described below
commit 61374638c02e5f996cad678253c99363f7ae01a5
Author: Luning (Lucas) Wang <wa...@gmail.com>
AuthorDate: Tue Aug 9 16:18:59 2022 +0800
[FLINK-25485][connector/jdbc] Add default jdbc option 'rewriteBatchedStatements' for MySQL dialect
This closes #18469.
---
.../flink/connector/jdbc/dialect/JdbcDialect.java | 12 +++++++
.../connector/jdbc/dialect/mysql/MySqlDialect.java | 18 ++++++++++
.../internal/options/JdbcConnectorOptions.java | 2 +-
.../jdbc/dialect/mysql/MySqlDialectTest.java | 41 ++++++++++++++++++++++
4 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
index 6d4c1db91b4..6cc6bbd57d1 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
@@ -142,4 +142,16 @@ public interface JdbcDialect extends Serializable {
*/
String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields);
+
+ /**
+ * Appends default JDBC properties to url for current dialect. Some database dialects will set
+ * default JDBC properties for performance or optimization consideration, such as MySQL dialect
+ * uses 'rewriteBatchedStatements=true' to enable execute multiple MySQL statements in batch
+ * mode.
+ *
+ * @return A JDBC url that has appended the default properties.
+ */
+ default String appendDefaultUrlProperties(String url) {
+ return url;
+ }
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
index 918af883685..b44155913aa 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialect.java
@@ -47,6 +47,10 @@ public class MySqlDialect extends AbstractDialect {
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
+ // The JDBC option to enable execute multiple MySQL statements in batch mode:
+ // https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements
+ private static final String REWRITE_BATCHED_STATEMENTS = "rewriteBatchedStatements";
+
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new MySQLRowConverter(rowType);
@@ -126,4 +130,18 @@ public class MySqlDialect extends AbstractDialect {
LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
}
+
+ @Override
+ public String appendDefaultUrlProperties(String url) {
+ if (!url.contains(REWRITE_BATCHED_STATEMENTS)) {
+ String defaultUrlProperties = REWRITE_BATCHED_STATEMENTS + "=true";
+ if (url.contains("?")) {
+ return url + "&" + defaultUrlProperties;
+ } else {
+ return url + "?" + defaultUrlProperties;
+ }
+ } else {
+ return url;
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
index 6158475a6aa..89d22bd3067 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
@@ -181,7 +181,7 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions {
}
return new JdbcConnectorOptions(
- dbURL,
+ dialect.appendDefaultUrlProperties(dbURL),
tableName,
driverName,
username,
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
new file mode 100644
index 00000000000..006cc7685c6
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MySqlDialect}. */
+public class MySqlDialectTest {
+
+ @Test
+ void testAppendDefaultUrlProperties() {
+ MySqlDialect dialect = new MySqlDialect();
+ assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo"))
+ .isEqualTo("jdbc:mysql://localhost:3306/foo?rewriteBatchedStatements=true");
+ assertThat(dialect.appendDefaultUrlProperties("jdbc:mysql://localhost:3306/foo?foo=bar"))
+ .isEqualTo("jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=true");
+ assertThat(
+ dialect.appendDefaultUrlProperties(
+ "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false"))
+ .isEqualTo(
+ "jdbc:mysql://localhost:3306/foo?foo=bar&rewriteBatchedStatements=false");
+ }
+}