You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/09/10 17:18:18 UTC

[GitHub] [skywalking] chenyi19851209 opened a new pull request #7691: tidb write by batch sql ,improve write speed.

chenyi19851209 opened a new pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691


   <!--
       ⚠️ Please make sure to read this template first, pull requests that don't accord with this template
       maybe closed without notice.
       Texts surrounded by `<` and `>` are meant to be replaced by you, e.g. <framework name>, <issue number>.
       Put an `x` in the `[ ]` to mark the item as CHECKED. `[x]`
   -->
   
   <!-- ==== πŸ› Remove this line WHEN AND ONLY WHEN you're fixing a bug, follow the checklist πŸ‘‡ ====
   ### Fix <bug description or the bug issue number or bug issue link>
   - [ ] Add a unit test to verify that the fix works.
   - [ ] Explain briefly why the bug exists and how to fix it.
        ==== πŸ› Remove this line WHEN AND ONLY WHEN you're fixing a bug, follow the checklist πŸ‘† ==== -->
   
   <!-- ==== πŸ“ˆ Remove this line WHEN AND ONLY WHEN you're improving the performance, follow the checklist πŸ‘‡ ====
   ### Improve the performance of <storage-tidb-plugin>
   - [ x] The benchmark result.
   - 
   ```text
   <
   modify before: one oap process write tidb provide 1500- 2000 tps and use cpu 140%
   modify after:    one oap process write tidb provide 4000-4600 tps  and use cpu   260% 
   >
   ```
   - [ ] Links/URLs to the theory proof or discussion articles/blogs. <links/URLs here>
        
   
   <!-- ==== πŸ†• Remove this line WHEN AND ONLY WHEN you're adding a new feature, follow the checklist πŸ‘‡ ====
   ### <Feature description>
   - [ ] If this is non-trivial feature, paste the links/URLs to the design doc.
   - [ ] Update the documentation to include this new feature.
   - [ ] Tests(including UT, IT, E2E) are added to verify the new feature.
   - [ ] If it's UI related, attach the screenshots below.
        ==== πŸ†• Remove this line WHEN AND ONLY WHEN you're adding a new feature, follow the checklist πŸ‘† ==== -->
   
   - [ X] If this pull request closes/resolves/fixes an existing issue, replace the issue number. Closes #<issue number>.
   - issue 7650
   - [ ] Update the [`CHANGES` log](https://github.com/apache/skywalking/blob/master/CHANGES.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706850687



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708437545



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;

Review comment:
       ok. done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708407577



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }

Review comment:
       i modify like this. please review like you expect.
   
   sqlExecutor#setParameters
   
   public void setParameters(PreparedStatement preparedStatement) throws SQLException {
           for (int i = 0; i < param.size(); i++) {
               preparedStatement.setObject(i + 1, param.get(i));
           }
       }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706852520



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,50 +22,61 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    private int h2AsyncBatchPersistentChannelSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int h2AsyncBatchPersistentPoolSize, int h2AsyncBatchPersistentChannelSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},channelSize: {},maxBatchSqlSize:{}", h2AsyncBatchPersistentPoolSize, h2AsyncBatchPersistentChannelSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, h2AsyncBatchPersistentChannelSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), h2AsyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        if (maxBatchSqlSize <= 1) {
+            executeSql(prepareRequests);
+        } else {
+            executeSql(prepareRequests, maxBatchSqlSize);
+        }

Review comment:
       ok .i remove this  'if'




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706857092



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;

Review comment:
       ok . i update backend-storage.md and configuration-vocabulary.md




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708383453



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
        I have verified in the tidb writing scenario, only by setting the parameters **rewriteBatchedStatements=true**  can the performance of batch execution be improved.
   
   this  explain:
   The rewriteBatchedStatements parameter must be added to the url of the MySQL JDBC connection. MySQL JDBC driver ignores the executeBatch() statement by default, splits a set of SQL statements that we expect to be executed in batches, and sends them to the MySQL database one by one. The batch insert is actually a single insert, which directly results in lower performance.Only by setting the rewriteBatchedStatements parameter to true, the driver will help you execute SQL in batches




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709241001



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;
+    }
+
+    @Override
+    public int hashCode() {
+        return sql.hashCode();
+    }

Review comment:
       ok .i use  @EqualsAndHashCode(of = "sql")  auto add equals and hashCode method .delete origin exist method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706946863



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {

Review comment:
       Why do we need a new method out of `flush`, but only called by it without any extra logic? Please merge them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706851697



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706852032



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int index = 0;

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709179946



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {

Review comment:
       Can you also replace line 48~50 to invoke this method `setParameters`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706947793



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,12 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public String getSql() {
+        return sql;
+    }

Review comment:
       You could override `hashcode` and `equals` methods using SQL as the only condition. Then, this object could be taken as `key` of HashMap. We don't need another `getSql` method actually, from my understanding.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706850421



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+            batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);

Review comment:
       i will modify like this .
   
   if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
                   batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
               } else {
                   List<PrepareRequest> prepareRequestList = new ArrayList<>();
                   batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
               }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706852467



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {

Review comment:
       in this batch prepareRequests when first same name sql to insert batchRequestMap. this get will be null.
   i modify this by  sonatype-lift review advise. modify after:
               if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
                   batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
               } else {
                   List<PrepareRequest> prepareRequestList = new ArrayList<>();
                   batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
               }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #7691: Improve the speed of writing TiDB by batching the SQL execution

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709314818



##########
File path: docs/en/setup/backend/backend-storage.md
##########
@@ -226,9 +231,12 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
 ```
 All connection-related settings, including URL link, username, and password are found in `application.yml`. 
 For details on settings, refer to the configuration of *MySQL* above.
+The function of the parameter [rewriteBatchedStatements=true] in TIDB .see document TIDB best practices(https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api)

Review comment:
       ```suggestion
   To understand the function of the parameter `rewriteBatchedStatements=true` in TiDB, see the document of [TiDB best practices](https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api).
   ```

##########
File path: CHANGES.md
##########
@@ -65,6 +65,7 @@ Release Notes.
 * Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function
 * Support gRPC sync grouped dynamic configurations.
 * Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
+* improve write tidb speed by batch execute sql.

Review comment:
       ```suggestion
   * Improve the speed of writing TiDB by batching the SQL execution.
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,63 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;

Review comment:
       ```suggestion
       private final int maxBatchSqlSize;
   ```

##########
File path: docs/en/setup/backend/backend-storage.md
##########
@@ -194,17 +196,20 @@ storage:
   selector: ${SW_STORAGE:mysql}
   mysql:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
       dataSource.user: ${SW_DATA_SOURCE_USER:root}
       dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
       dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
       dataSource.prepStmtCacheSize: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_SIZE:250}
       dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
       dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
 ```
 All connection-related settings, including URL link, username, and password are found in `application.yml`. 
 Only part of the settings are listed here. See the [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
+The function of the parameter [rewriteBatchedStatements=true] in MYSQL .see MYSQL official document (https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements) 

Review comment:
       ```suggestion
   To understand the function of the parameter `rewriteBatchedStatements=true` in MySQL, see the [MySQL official document](https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements).
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] sonatype-lift[bot] commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706380764



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+            batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+        }
+        try (Connection connection = h2Client.getConnection()) {
+            try {
+                for (String key : batchRequestMap.keySet()) {
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(key, batchRequestMap.get(key));

Review comment:
       *INEFFICIENT_KEYSET_ITERATOR:*  Accessing a value using a key that was retrieved from a `keySet` iterator. It is more efficient to use an iterator on the `entrySet` of the map, avoiding the extra `HashMap.get(key)` lookup.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+            batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);

Review comment:
       *NULL_DEREFERENCE:*  object returned by `batchRequestMap.get(sqlExecutor.getSql())` could be null and is dereferenced at line 106.
   (at-me [in a reply](https://help.sonatype.com/lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng merged pull request #7691: Improve the speed of writing TiDB by batching the SQL execution

Posted by GitBox <gi...@apache.org>.
wu-sheng merged pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708796306



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       Also, this URL suffix is missing in the `backend-storage.md`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709295152



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;
+    }
+
+    @Override
+    public int hashCode() {
+        return sql.hashCode();
+    }

Review comment:
       > @chenyi19851209 We still need you to polish a little more.
   
   ok . i have modified based review advise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708960981



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);

Review comment:
       ```suggestion
               log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {}, maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
##########
@@ -25,5 +25,16 @@
 @Setter
 @Getter
 public class PostgreSQLStorageConfig extends MySQLStorageConfig {
-
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 4;

Review comment:
       `PostgreSQLStorageConfig extends MySQLStorageConfig` and these 2 fields are in `MySQLStorageConfig` already, so I guess we don't need to declare here again

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;
+    }
+
+    @Override
+    public int hashCode() {
+        return sql.hashCode();
+    }

Review comment:
       The `equals` and `hashCode` method don't take many edge cases into consideration, prefer to annotate this class with `@EqualsAndHashCode(of = "sql")` which generates the boilerplate codes.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.maxBatchSqlSize = maxBatchSqlSize;
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        Map<Integer, List<PrepareRequest>> batchRequestMap = new HashMap<>();

Review comment:
       You can use `SQLExecutor` as the key directly.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.maxBatchSqlSize = maxBatchSqlSize;
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        Map<Integer, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.hashCode())) {
+                batchRequestMap.get(sqlExecutor.hashCode()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.hashCode(), prepareRequestList);
+            }

Review comment:
       All these lines are grouping the list into a map, which can be simplified like 
   
   ```java
           final Map<PrepareRequest, List<PrepareRequest>> batchRequestMap =
               prepareRequests.stream().collect(groupingBy(Function.identity()));
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private final List<PrepareRequest> prepareRequests;
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+        }
+        if (prepareRequests.size() == 0) {
+            return;
+        }
+        String sql = prepareRequests.get(0).toString();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            int pendingCount = 0;
+            for (int k = 0; k < prepareRequests.size(); k++) {
+                SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+                sqlExecutor.setParameters(preparedStatement);
+                preparedStatement.addBatch();
+                if (k > 0 && k % maxBatchSqlSize == 0) {
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    pendingCount = 0;
+                } else {
+                    pendingCount++;
+                }
+            }
+            if (pendingCount > 0) {
+                executeBatch(preparedStatement, pendingCount, sql);
+            }
+        } catch (SQLException e) {
+            throw e;

Review comment:
       There is no point to catch an exception and throw it immediately, you can remove the `catch` clause, `try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { xxxxx }` is legal in Java

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private final List<PrepareRequest> prepareRequests;
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+        }
+        if (prepareRequests.size() == 0) {
+            return;
+        }
+        String sql = prepareRequests.get(0).toString();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            int pendingCount = 0;
+            for (int k = 0; k < prepareRequests.size(); k++) {
+                SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+                sqlExecutor.setParameters(preparedStatement);
+                preparedStatement.addBatch();
+                if (k > 0 && k % maxBatchSqlSize == 0) {
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    pendingCount = 0;
+                } else {
+                    pendingCount++;
+                }
+            }
+            if (pendingCount > 0) {
+                executeBatch(preparedStatement, pendingCount, sql);
+            }
+        } catch (SQLException e) {
+            throw e;
+        }
+    }
+
+    private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+        long start = System.currentTimeMillis();
+        preparedStatement.executeBatch();
+        long end = System.currentTimeMillis();
+        long cost = end - start;
+        if (log.isDebugEnabled()) {
+            log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
+        }
+    }

Review comment:
       Move the calculation into the `if` condition so we can calculate it only when really needed
   
   ```suggestion
           if (log.isDebugEnabled()) {
               long end = System.currentTimeMillis();
               long cost = end - start;
               log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
           }
   ```

##########
File path: oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageConfig.java
##########
@@ -25,4 +25,16 @@
 @Setter
 @Getter
 public class TiDBStorageConfig extends MySQLStorageConfig {
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 4;

Review comment:
       `TiDBStorageConfig extends MySQLStorageConfig` and these 2 fields are in `MySQLStorageConfig` already, so I guess we don't need to declare here again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708758242



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;

Review comment:
       This is reference equal, you should use `equals`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706946308



##########
File path: docs/en/setup/backend/configuration-vocabulary.md
##########
@@ -114,16 +114,22 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
 | - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In H2, we use multiple physical columns to host the values: e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
 | - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have the same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of execute batch sql per execute sql | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |

Review comment:
       ```suggestion
   | - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |
   ```
   
   Please apply this to other comments and documents too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708405489



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       Then, whether we should add them to MySQL too at least? And, we should add the link of the explanation(MySQL or TiDB official doc)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708132509



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {

Review comment:
       Please also replace iterator loop with enhanced `for` loop
   
   
   ```java
   
               batchRequestMap.forEach((sql, requests) -> {
                   try {
                       BatchSQLExecutor batchSQLExecutor =
                           new BatchSQLExecutor(sql, requests);
                       batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                   } catch (SQLException e) {
                       // Just avoid one execution failure makes the rest of batch failure.
                       log.error(e.getMessage(), e);
                   }
               });
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709293199



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.maxBatchSqlSize = maxBatchSqlSize;
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        Map<Integer, List<PrepareRequest>> batchRequestMap = new HashMap<>();

Review comment:
       ok . done .use this code
   final Map<PrepareRequest, List<PrepareRequest>> batchRequestMap =
                   prepareRequests.stream().collect(Collectors.groupingBy(Function.identity()));




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-919669605


   Please update `changes.md` in the root for changelog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708385983



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,12 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */

Review comment:
       ok. add @since 8.8.0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708439107



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709292456



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);

Review comment:
       ok . done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-917279180


   Please resolve conflicts first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708429039



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,12 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public List<Object> getParam() {
+        return param;
+    }

Review comment:
       ok. remove getParam method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709201136



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       yes. i add this in MYSQL storage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706846855



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+            batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+        }
+        try (Connection connection = h2Client.getConnection()) {
+            try {
+                for (String key : batchRequestMap.keySet()) {
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(key, batchRequestMap.get(key));

Review comment:
       ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706605537



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);

Review comment:
       Recommend to use `@Slf4j`

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;

Review comment:
       No configuration requires `@since 8.8.0`
   
   The storage doc and configuration-vocabulary.md should be updated accordingly.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -81,6 +92,36 @@ public void flush(List<PrepareRequest> prepareRequests) {
         }
     }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.get(sqlExecutor.getSql()) == null) {

Review comment:
       When this could be null?

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,16 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;

Review comment:
       ```suggestion
       /**
        * max size per batch execute sql
        */
       private int maxSizeOfBatchSql = 100;
       /**
        * async batch execute pool size
        */
       private int h2AsyncBatchPersistentPoolSize = 1;
       /**
        * async batch execute channel size
        */
       private int h2AsyncBatchPersistentChannelSize = 1;
   ```
   
   H2 is not that powerful, I recommend to use less number of threads and size in default.
   ___
   Also, according to DataCarrier implementation, it is better to merge `h2AsyncBatchPersistentPoolSize` and `h2AsyncBatchPersistentChannelSize` as `asyncBatchPersistentPoolSize` only. There is no point to create more threads than the number of channels.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int index = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                index = 0;
+            } else {
+                index = index + 1;

Review comment:
       ```suggestion
                   index++;
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
##########
@@ -39,5 +39,18 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;
+

Review comment:
       Same as above, you should consider merging. And `h2` prefix is incorrect.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,50 +22,61 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    private int h2AsyncBatchPersistentChannelSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int h2AsyncBatchPersistentPoolSize, int h2AsyncBatchPersistentChannelSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},channelSize: {},maxBatchSqlSize:{}", h2AsyncBatchPersistentPoolSize, h2AsyncBatchPersistentChannelSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, h2AsyncBatchPersistentChannelSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), h2AsyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        if (maxBatchSqlSize <= 1) {
+            executeSql(prepareRequests);
+        } else {
+            executeSql(prepareRequests, maxBatchSqlSize);
+        }

Review comment:
       I think making `size == 1` as a special case doesn't make sense. 

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchSQLExecutor.class);
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int index = 0;

Review comment:
       Why name this `index`? It should be `pendingCount` or similar.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709293392



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/postgresql/PostgreSQLStorageConfig.java
##########
@@ -25,5 +25,16 @@
 @Setter
 @Getter
 public class PostgreSQLStorageConfig extends MySQLStorageConfig {
-
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 4;

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-tidb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/tidb/TiDBStorageConfig.java
##########
@@ -25,4 +25,16 @@
 @Setter
 @Getter
 public class TiDBStorageConfig extends MySQLStorageConfig {
+    /**
+     * The maximum size of batch size of SQL execution
+     *
+     * @since 8.8.0
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     *
+     * @since 8.8.0
+     */
+    private int asyncBatchPersistentPoolSize  = 4;

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,62 +22,73 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.maxBatchSqlSize = maxBatchSqlSize;
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        Map<Integer, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.hashCode())) {
+                batchRequestMap.get(sqlExecutor.hashCode()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.hashCode(), prepareRequestList);
+            }

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private final List<PrepareRequest> prepareRequests;
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+        }
+        if (prepareRequests.size() == 0) {
+            return;
+        }
+        String sql = prepareRequests.get(0).toString();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            int pendingCount = 0;
+            for (int k = 0; k < prepareRequests.size(); k++) {
+                SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+                sqlExecutor.setParameters(preparedStatement);
+                preparedStatement.addBatch();
+                if (k > 0 && k % maxBatchSqlSize == 0) {
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    pendingCount = 0;
+                } else {
+                    pendingCount++;
+                }
+            }
+            if (pendingCount > 0) {
+                executeBatch(preparedStatement, pendingCount, sql);
+            }
+        } catch (SQLException e) {
+            throw e;
+        }
+    }
+
+    private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
+        long start = System.currentTimeMillis();
+        preparedStatement.executeBatch();
+        long end = System.currentTimeMillis();
+        long cost = end - start;
+        if (log.isDebugEnabled()) {
+            log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
+        }
+    }

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708428738



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,12 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public String getSql() {
+        return sql;
+    }

Review comment:
       ok .done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709152501



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       @chenyi19851209 Please follow the above recommendation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on pull request #7691: Improve the speed of writing TiDB by batching the SQL execution

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-921003780


   > One UT fails, please fix
   
   ok.done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708450655



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                pendingCount = 0;
+            } else {
+                pendingCount++;
+            }
+        }
+        if (pendingCount > 0) {
+            long start = System.currentTimeMillis();
+            preparedStatement.executeBatch();
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            if (log.isDebugEnabled()) {
+                log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", pendingCount, cost, sql);
+            }

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                pendingCount = 0;
+            } else {
+                pendingCount++;
+            }
+        }
+        if (pendingCount > 0) {
+            long start = System.currentTimeMillis();
+            preparedStatement.executeBatch();
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            if (log.isDebugEnabled()) {
+                log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", pendingCount, cost, sql);

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, List<PrepareRequest>> next = iterator.next();
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(next.getKey(), next.getValue());
+                    batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                 }
+            } catch (SQLException e) {
+                // Just avoid one execution failure makes the rest of batch failure.

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-920059896


   > Please update `changes.md` in the root for changelog.
   
   ok. i add changelog  in changes.md


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-919982396


   @chenyi19851209 We still need you to polish a little more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706852791



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageConfig.java
##########
@@ -39,5 +39,18 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */
+    private int maxSizeOfBatchSql = 2000;
+    /**
+     * async batch execute pool size
+     */
+    private int h2AsyncBatchPersistentPoolSize = 4;
+    /**
+     * async batch execute channel size
+     */
+    private int h2AsyncBatchPersistentChannelSize = 4;
+

Review comment:
       ok. i use asyncBatchPersistentPoolSize  and remove h2AsyncBatchPersistentChannelSize




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] kezhenxu94 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
kezhenxu94 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708129440



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, List<PrepareRequest>> next = iterator.next();
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(next.getKey(), next.getValue());
+                    batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                 }
+            } catch (SQLException e) {
+                // Just avoid one execution failure makes the rest of batch failure.
+                log.error(e.getMessage(), e);
             }
+
         } catch (SQLException | JDBCClientException e) {
+            log.warn("execute sql failed, discard data size: {}", prepareRequests.size());

Review comment:
       ```suggestion
               log.warn("execute sql failed, discard data size: {}", prepareRequests.size(), e);
   ```
   
   And remove the next line `log.error(e.getMessage(), e);`

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {

Review comment:
       Please also replace iterator loop with enhanced for loop
   
   
   ```java
   
               batchRequestMap.forEach((sql, requests) -> {
                   try {
                       BatchSQLExecutor batchSQLExecutor =
                           new BatchSQLExecutor(sql, requests);
                       batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                   } catch (SQLException e) {
                       // Just avoid one execution failure makes the rest of batch failure.
                       log.error(e.getMessage(), e);
                   }
               });
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                pendingCount = 0;
+            } else {
+                pendingCount++;
+            }
+        }
+        if (pendingCount > 0) {
+            long start = System.currentTimeMillis();
+            preparedStatement.executeBatch();
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            if (log.isDebugEnabled()) {
+                log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", pendingCount, cost, sql);
+            }

Review comment:
       This duplicated part can be extracted as a private method.

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);

Review comment:
       `preparedStatement` is not closed

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);
+                }
+                pendingCount = 0;
+            } else {
+                pendingCount++;
+            }
+        }
+        if (pendingCount > 0) {
+            long start = System.currentTimeMillis();
+            preparedStatement.executeBatch();
+            long end = System.currentTimeMillis();
+            long cost = end - start;
+            if (log.isDebugEnabled()) {
+                log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", pendingCount, cost, sql);

Review comment:
       ```suggestion
                   log.debug("execute batch sql, batch size: {}, cost: {}ms, sql: {}", pendingCount, cost, sql);
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }
+            preparedStatement.addBatch();
+            if (k > 0 && k % maxBatchSqlSize == 0) {
+                long start = System.currentTimeMillis();
+                preparedStatement.executeBatch();
+                long end = System.currentTimeMillis();
+                long cost = end - start;
+                if (log.isDebugEnabled()) {
+                    log.debug("execute batch sql,batch size: {}, cost:{},sql: {}", maxBatchSqlSize, cost, sql);

Review comment:
       Please keep the code style consistent, even in the logs
   
   ```suggestion
                       log.debug("execute batch sql, batch size: {}, cost: {}ms, sql: {}", maxBatchSqlSize, cost, sql);
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }

Review comment:
       Make the 2 fields `final` and use `@RequiredArgsConstructor`.
   
   ```java
   @Slf4j
   @RequiredArgsConstructor
   public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
   
       private final String sql;
       private final List<PrepareRequest> prepareRequests;
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, List<PrepareRequest>> next = iterator.next();
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(next.getKey(), next.getValue());
+                    batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                 }
+            } catch (SQLException e) {
+                // Just avoid one execution failure makes the rest of batch failure.

Review comment:
       The commend `// Just avoid one execution failure makes the rest of batch failure.` doesn't make sense, if you want to do that, please `try-catch` inside the while loop.
   
   
   ```java
               while (iterator.hasNext()) {
                   try {
                       Map.Entry<String, List<PrepareRequest>> next = iterator.next();
                       BatchSQLExecutor batchSQLExecutor =
                           new BatchSQLExecutor(next.getKey(), next.getValue());
                       batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                   } catch (SQLException e) {
                       // Just avoid one execution failure makes the rest of batch failure.
                       log.error(e.getMessage(), e);
                   }
               }
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;

Review comment:
       Unused
   
   ```suggestion
   ```

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {

Review comment:
       You need to assign `batchSqlSize` to field `maxBatchSqlSize`, it's unused now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706947361



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private String sql;
+    private List<PrepareRequest> prepareRequests;
+
+    public BatchSQLExecutor(String sql, List<PrepareRequest> prepareRequests) {
+        this.sql = sql;
+        this.prepareRequests = prepareRequests;
+    }
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch.sql by key size: {},sql:{}", prepareRequests.size(), sql);
+        }
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        int pendingCount = 0;
+        for (int k = 0; k < prepareRequests.size(); k++) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+            for (int i = 0; i < sqlExecutor.getParam().size(); i++) {
+                preparedStatement.setObject(i + 1, sqlExecutor.getParam().get(i));
+            }

Review comment:
       This code block should be merged into `sqlExecutor#setParameters`, rather than using `getParam` to expose the logic. Notice, SQLExecutor is the owner of the logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706947924



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,12 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public String getSql() {
+        return sql;
+    }
+
+    public List<Object> getParam() {
+        return param;
+    }

Review comment:
       This should be replaced by https://github.com/apache/skywalking/pull/7691/files#r706947361




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706946504



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       `rewriteBatchedStatements=true` What does this mean? And this doesn't exist in your doc change. Please make sure they are consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r706946577



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageConfig.java
##########
@@ -59,4 +59,12 @@
      * @since 8.2.0
      */
     private int numOfSearchableValuesPerTag = 2;
+    /**
+     * max size per batch execute sql
+     */

Review comment:
       Still miss `@Since`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] sonatype-lift[bot] commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
sonatype-lift[bot] commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708534785



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;

Review comment:
       *opt.semgrep.java.lang.correctness.no-string-eqeq.no-string-eqeq:*  Strings should not be compared with `==`.
   This is a reference comparison operator.
   Use `.equals()` instead.
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with `help` or `ignore`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-919377599


   > @chenyi19851209 Could you polish this in time? We are closing to 8.8.0 release.
   
   ok. i try modify more in time. thanks for review and advise


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708439830



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. data size:{}", prepareRequests.size());
+        }
+        Map<String, List<PrepareRequest>> batchRequestMap = new HashMap<>();
+        for (PrepareRequest prepareRequest : prepareRequests) {
+            SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
+            if (batchRequestMap.containsKey(sqlExecutor.getSql())) {
+                batchRequestMap.get(sqlExecutor.getSql()).add(prepareRequest);
+            } else {
+                List<PrepareRequest> prepareRequestList = new ArrayList<>();
+                prepareRequestList.add(sqlExecutor);
+                batchRequestMap.put(sqlExecutor.getSql(), prepareRequestList);
+            }
+        }
         try (Connection connection = h2Client.getConnection()) {
-            for (PrepareRequest prepareRequest : prepareRequests) {
-                try {
-                    SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
-                    sqlExecutor.invoke(connection);
-                } catch (SQLException e) {
-                    // Just avoid one execution failure makes the rest of batch failure.
-                    log.error(e.getMessage(), e);
+            try {
+                Set<Map.Entry<String, List<PrepareRequest>>> entrySet = batchRequestMap.entrySet();
+                Iterator<Map.Entry<String, List<PrepareRequest>>> iterator = entrySet.iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, List<PrepareRequest>> next = iterator.next();
+                    BatchSQLExecutor batchSQLExecutor = new BatchSQLExecutor(next.getKey(), next.getValue());
+                    batchSQLExecutor.invoke(connection, maxBatchSqlSize);
                 }
+            } catch (SQLException e) {
+                // Just avoid one execution failure makes the rest of batch failure.
+                log.error(e.getMessage(), e);
             }
+
         } catch (SQLException | JDBCClientException e) {
+            log.warn("execute sql failed, discard data size: {}", prepareRequests.size());

Review comment:
       ok done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709293914



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/BatchSQLExecutor.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * A Batch SQL executor.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
+
+    private final List<PrepareRequest> prepareRequests;
+
+    public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
+        }
+        if (prepareRequests.size() == 0) {
+            return;
+        }
+        String sql = prepareRequests.get(0).toString();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            int pendingCount = 0;
+            for (int k = 0; k < prepareRequests.size(); k++) {
+                SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
+                sqlExecutor.setParameters(preparedStatement);
+                preparedStatement.addBatch();
+                if (k > 0 && k % maxBatchSqlSize == 0) {
+                    executeBatch(preparedStatement, maxBatchSqlSize, sql);
+                    pendingCount = 0;
+                } else {
+                    pendingCount++;
+                }
+            }
+            if (pendingCount > 0) {
+                executeBatch(preparedStatement, pendingCount, sql);
+            }
+        } catch (SQLException e) {
+            throw e;

Review comment:
       ok.done

##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708381962



##########
File path: docs/en/setup/backend/configuration-vocabulary.md
##########
@@ -114,16 +114,22 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
 | - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In H2, we use multiple physical columns to host the values: e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
 | - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have the same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
+| - | - | maxSizeOfBatchSql | The maximum size of execute batch sql per execute sql | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709228612



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/SQLExecutor.java
##########
@@ -54,4 +54,26 @@ public void invoke(Connection connection) throws SQLException {
         }
         preparedStatement.execute();
     }
+
+    public void setParameters(PreparedStatement preparedStatement) throws SQLException {
+        for (int i = 0; i < param.size(); i++) {
+            preparedStatement.setObject(i + 1, param.get(i));
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        SQLExecutor s = (SQLExecutor) o;
+        return sql == s.sql;

Review comment:
       ok.done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] bb7133 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
bb7133 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r709149524



##########
File path: oap-server/server-starter/src/main/resources/application.yml
##########
@@ -170,9 +172,11 @@ storage:
     metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
     maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
     numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
+    maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
+    asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
   tidb:
     properties:
-      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
+      jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}

Review comment:
       > Then, whether we should add them to MySQL too at least
   
   It is applicable to both MySQL and TiDB(and other databases, of course), you can also check it in the ['best practice' part](https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api) of TiDB website.
   
   I would recommend setting it for MySQL, too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] wu-sheng commented on pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#issuecomment-918970619


   @chenyi19851209 Could you polish this in time? We are closing to 8.8.0 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking] chenyi19851209 commented on a change in pull request #7691: tidb write by batch sql ,improve write speed.

Posted by GitBox <gi...@apache.org>.
chenyi19851209 commented on a change in pull request #7691:
URL: https://github.com/apache/skywalking/pull/7691#discussion_r708388754



##########
File path: oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2BatchDAO.java
##########
@@ -22,61 +22,85 @@
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
 import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
 
 @Slf4j
 public class H2BatchDAO implements IBatchDAO {
     private JDBCHikariCPClient h2Client;
     private final DataCarrier<PrepareRequest> dataCarrier;
+    private int maxBatchSqlSize = 2000;
+    private int asyncBatchPersistentPoolSize = 4;
 
-    public H2BatchDAO(JDBCHikariCPClient h2Client) {
+    public H2BatchDAO(JDBCHikariCPClient h2Client, int batchSqlSize, int asyncBatchPersistentPoolSize) {
         this.h2Client = h2Client;
-
         String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
-        try {
-            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
-        } catch (Exception e) {
-            throw new UnexpectedException(e.getMessage(), e);
+        if (log.isDebugEnabled()) {
+            log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {},maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, batchSqlSize);
         }
-
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
+        this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
+        this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
     }
 
     @Override
     public void flush(List<PrepareRequest> prepareRequests) {
         if (CollectionUtils.isEmpty(prepareRequests)) {
             return;
         }
-
         if (log.isDebugEnabled()) {
-            log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
+            log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
+        }
+        executeSql(prepareRequests, maxBatchSqlSize);
+        if (log.isDebugEnabled()) {
+            log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
         }
+    }
 
+    private void executeSql(List<PrepareRequest> prepareRequests, int maxBatchSqlSize) {

Review comment:
       ok. i remove executeSql method . only remain flush method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org