You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/09/11 13:55:46 UTC

[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706605537



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);

Review comment:
       Recommend to use `@Slf4j`

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;

Review comment:
       No configuration requires `@since 8.8.0`
   
   The storage doc and configuration-vocabulary.md should be updated accordingly.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {

Review comment:
       When this could be null?

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;

Review comment:
       ```suggestion
       /**
        * max size per batch execute sql
        */
       private int maxSizeOfBatchSql = 100;
       /**
        * async batch execute pool size
        */
       private int h2AsyncBatchPersistentPoolSize = 1;
       /**
        * async batch execute channel size
        */
       private int h2AsyncBatchPersistentChannelSize = 1;
   ```
   
   H2 is not that powerful, I recommend to use less number of threads and size in default.
   ___
   Also, according to DataCarrier implementation, it is better to merge `h2AsyncBatchPersistentPoolSize` and `h2AsyncBatchPersistentChannelSize` as `asyncBatchPersistentPoolSize` only. There is no point to create more threads than the number of channels.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int index = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                index = 0;
+            } else {
+                index = index + 1;

Review comment:
       ```suggestion
                   index++;
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
##########
@@ -39,5 +39,18 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;
+

Review comment:
       Same as above, you should consider merging. And `h2` prefix is incorrect.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,50 +22,61 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    private int h2AsyncBatchPersistentChannelSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int h2AsyncBatchPersistentPoolSize, int h2AsyncBatchPersistentChannelSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},channelSize: {},maxBatchSqlSize:{}", h2AsyncBatchPersistentPoolSize, h2AsyncBatchPersistentChannelSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, h2AsyncBatchPersistentChannelSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), h2AsyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        if (maxBatchSqlSize <= 1) {
+            executeSql(prepareRequests);
+        } else {
+            executeSql(prepareRequests, maxBatchSqlSize);
+        }

Review comment:
       I think making `size == 1` as a special case doesn't make sense. 

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int index = 0;

Review comment:
       Why name this `index`? It should be `pendingCount` or similar.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org