You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/09/16 16:05:37 UTC
[skywalking] branch master updated: Improve the speed of writing
TiDB by batching the SQL execution (#7691)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 224b6c6 Improve the speed of writing TiDB by batching the SQL execution (#7691)
224b6c6 is described below
commit 224b6c6b3a7f72ef02d4dca5f73b32c76767d08e
Author: nicolchen <40...@qq.com>
AuthorDate: Fri Sep 17 00:05:18 2021 +0800
Improve the speed of writing TiDB by batching the SQL execution (#7691)
---
CHANGES.md | 1 +
docs/en/setup/backend/backend-storage.md | 14 +++-
docs/en/setup/backend/configuration-vocabulary.md | 6 ++
.../src/main/resources/application.yml | 12 +++-
.../config/ApplicationConfigLoaderTestCase.java | 2 +-
.../storage/plugin/jdbc/BatchSQLExecutor.java | 76 ++++++++++++++++++++++
.../server/storage/plugin/jdbc/SQLExecutor.java | 17 +++--
.../storage/plugin/jdbc/h2/H2StorageConfig.java | 12 ++++
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 2 +-
.../storage/plugin/jdbc/h2/dao/H2BatchDAO.java | 45 ++++++-------
.../plugin/jdbc/mysql/MySQLStorageConfig.java | 12 ++++
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 2 +-
.../jdbc/postgresql/PostgreSQLStorageConfig.java | 1 -
.../jdbc/postgresql/PostgreSQLStorageProvider.java | 2 +-
.../plugin/jdbc/tidb/TiDBStorageProvider.java | 2 +-
15 files changed, 170 insertions(+), 36 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 15e11e7..6d1e8b7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@ Release Notes.
* Support gRPC sync grouped dynamic configurations.
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
+* Improve the speed of writing TiDB by batching the SQL execution.
#### UI
diff --git a/docs/en/setup/backend/backend-storage.md b/docs/en/setup/backend/backend-storage.md
index 45468f5..5151546 100644
--- a/docs/en/setup/backend/backend-storage.md
+++ b/docs/en/setup/backend/backend-storage.md
@@ -30,6 +30,8 @@ storage:
driver: org.h2.jdbcx.JdbcDataSource
url: jdbc:h2:mem:skywalking-oap-db
user: sa
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
```
## OpenSearch
@@ -194,7 +196,7 @@ storage:
selector: ${SW_STORAGE:mysql}
mysql:
properties:
- jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
+ jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -202,9 +204,12 @@ storage:
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. See the [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
+To understand the function of the parameter `rewriteBatchedStatements=true` in MySQL, see the [MySQL official document](https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements).
## TiDB
Tested TiDB Server 4.0.8 version and MySQL Client driver 8.0.13 version are currently available.
@@ -215,7 +220,7 @@ storage:
selector: ${SW_STORAGE:tidb}
tidb:
properties:
- jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest"}
+ jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -226,9 +231,12 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
For details on settings, refer to the configuration of *MySQL* above.
+To understand the function of the parameter `rewriteBatchedStatements=true` in TiDB, see the document of [TiDB best practices](https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api).
## InfluxDB
InfluxDB storage provides a time-series database as a new storage option.
@@ -266,6 +274,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. Please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 5387a69..1e48ed0 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -114,16 +114,22 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In H2, we use multiple physical columns to host the values: e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have the same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 1 |
| - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist. Please copy it into the oap-lib folder manually. | - | - |
| - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In MySQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
| - |postgresql| - | PostgreSQL storage. | - | - |
| - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In PostgreSQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
+| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
| - |influxdb| - | InfluxDB storage. |- | - |
| - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
| - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index dc13b21..979b174 100755
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -158,9 +158,11 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
mysql:
properties:
- jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
+ jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -170,9 +172,11 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
tidb:
properties:
- jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+ jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
@@ -183,6 +187,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
influxdb:
# InfluxDB configuration
url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
@@ -206,6 +212,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+ maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+ asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
zipkin-elasticsearch:
namespace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
diff --git a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
index ccc7bf2..52b61c5 100644
--- a/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
+++ b/oap-server/server-starter/src/test/java/org/apache/skywalking/oap/server/starter/config/ApplicationConfigLoaderTestCase.java
@@ -49,7 +49,7 @@ public class ApplicationConfigLoaderTestCase {
assertThat(providerConfig.get("metadataQueryMaxSize"), is(5000));
assertThat(providerConfig.get("properties"), instanceOf(Properties.class));
Properties properties = (Properties) providerConfig.get("properties");
- assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest"));
+ assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"));
}
@Test
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
new file mode 100644
index 0000000..a3d4848
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+ private final List<PrepareRequest> prepareRequests;
+
+ public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+ if (log.isDebugEnabled()) {
+ log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+ }
+ if (prepareRequests.size() == 0) {
+ return;
+ }
+ String sql = prepareRequests.get(0).toString();
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ int pendingCount = 0;
+ for (int k = 0; k < prepareRequests.size(); k++) {
+ SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+ sqlExecutor.setParameters(preparedStatement);
+ preparedStatement.addBatch();
+ if (k > 0 && k % maxBatchSqlSize == 0) {
+ executeBatch(preparedStatement, maxBatchSqlSize, sql);
+ pendingCount = 0;
+ } else {
+ pendingCount++;
+ }
+ }
+ if (pendingCount > 0) {
+ executeBatch(preparedStatement, pendingCount, sql);
+ }
+ }
+ }
+
+ private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+ long start = System.currentTimeMillis();
+ preparedStatement.executeBatch();
+ if (log.isDebugEnabled()) {
+ long end = System.currentTimeMillis();
+ long cost = end - start;
+ log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
index 65c5d02..8891b95 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
+import lombok.EqualsAndHashCode;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* A SQL executor.
*/
+@EqualsAndHashCode(of = "sql")
public class SQLExecutor implements InsertRequest, UpdateRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
@@ -44,14 +46,21 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
public void invoke(Connection connection) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ setParameters(preparedStatement);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
+ }
+ preparedStatement.execute();
+ }
+ public void setParameters(PreparedStatement preparedStatement) throws SQLException {
for (int i = 0; i < param.size(); i++) {
preparedStatement.setObject(i + 1, param.get(i));
}
+ }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
- }
- preparedStatement.execute();
+ @Override
+ public String toString() {
+ return sql;
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
index 06b8b26..90f400e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
@@ -59,4 +59,16 @@ public class H2StorageConfig extends ModuleConfig {
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
+ /**
+ * The maximum size of batch size of SQL execution
+ *
+ * @since 8.8.0
+ */
+ private int maxSizeOfBatchSql = 100;
+ /**
+ * async batch execute pool size
+ *
+ * @since 8.8.0
+ */
+ private int asyncBatchPersistentPoolSize = 1;
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index b074566..1a93880 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -117,7 +117,7 @@ public class H2StorageProvider extends ModuleProvider {
settings.setProperty("dataSource.password", config.getPassword());
h2Client = new JDBCHikariCPClient(settings);
- this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
+ this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
index 2101e68..cd7d4fb 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
@@ -22,38 +22,35 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
@Slf4j
public class H2BatchDAO implements IBatchDAO {
private JDBCHikariCPClient h2Client;
private final DataCarrier<PrepareRequest> dataCarrier;
+ private final int maxBatchSqlSize;
- public H2BatchDAO(JDBCHikariCPClient h2Client) {
+ public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
this.h2Client = h2Client;
-
String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
- try {
- ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
- } catch (Exception e) {
- throw new UnexpectedException(e.getMessage(), e);
+ if (log.isDebugEnabled()) {
+ log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {}, maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
}
-
- this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+ this.maxBatchSqlSize = maxBatchSqlSize;
+ this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+ this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
}
@Override
@@ -61,23 +58,27 @@ public class H2BatchDAO implements IBatchDAO {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
-
if (log.isDebugEnabled()) {
- log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+ log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
}
-
+ final Map<PrepareRequest, List<PrepareRequest>> batchRequestMap =
+ prepareRequests.stream().collect(Collectors.groupingBy(Function.identity()));
try (Connection connection = h2Client.getConnection()) {
- for (PrepareRequest prepareRequest : prepareRequests) {
+ batchRequestMap.forEach((key, requests) -> {
try {
- SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
- sqlExecutor.invoke(connection);
+ BatchSQLExecutor batchSQLExecutor =
+ new BatchSQLExecutor(requests);
+ batchSQLExecutor.invoke(connection, maxBatchSqlSize);
} catch (SQLException e) {
// Just avoid one execution failure makes the rest of batch failure.
log.error(e.getMessage(), e);
}
- }
+ });
} catch (SQLException | JDBCClientException e) {
- log.error(e.getMessage(), e);
+ log.warn("execute sql failed, discard data size: {}", prepareRequests.size(), e);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
index bf0cccb..3e0ddfb 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
@@ -39,5 +39,17 @@ public class MySQLStorageConfig extends ModuleConfig {
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
+ /**
+ * The maximum size of batch size of SQL execution
+ *
+ * @since 8.8.0
+ */
+ private int maxSizeOfBatchSql = 2000;
+ /**
+ * async batch execute pool size
+ *
+ * @since 8.8.0
+ */
+ private int asyncBatchPersistentPoolSize = 4;
private Properties properties;
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 233431a..c9745b8 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -103,7 +103,7 @@ public class MySQLStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
- this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
+ this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
index 67465bf..0715923 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
@@ -25,5 +25,4 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLStorageCo
@Setter
@Getter
public class PostgreSQLStorageConfig extends MySQLStorageConfig {
-
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
index 52800b3..031f510 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageProvider.java
@@ -103,7 +103,7 @@ public class PostgreSQLStorageProvider extends ModuleProvider {
postgresqlClient = new JDBCHikariCPClient(config.getProperties());
- this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient));
+ this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
diff --git a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
index 05d057c..935ec7a 100644
--- a/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageProvider.java
@@ -106,7 +106,7 @@ public class TiDBStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
- this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
+ this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(