You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/04/18 09:10:49 UTC

[GitHub] [skywalking] yangyiweigege commented on issue #8897: [Bug] ES BulkProcessor in BatchProcessEsDAO Initialize multiple times

yangyiweigege commented on issue #8897:
URL: https://github.com/apache/skywalking/issues/8897#issuecomment-1101245769

   code in org.apache.skywalking.oap.server.core.storage.PersistenceTimer#extractDataAndSave:
   ```java
    final CompletableFuture<Void> future =
               CompletableFuture.allOf(workers.stream().map(worker -> {
                   return CompletableFuture.runAsync(() -> {
                       List<PrepareRequest> innerPrepareRequests;
                       // Prepare stage
                       try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) {
                           if (log.isDebugEnabled()) {
                               log.debug(
                                   "extract {} worker data and save",
                                   worker.getClass().getName()
                               );
                           }
   
                           innerPrepareRequests = worker.buildBatchRequests();
   
                           worker.endOfRound();
                       }
   
                       if (CollectionUtils.isEmpty(innerPrepareRequests)) {
                           return;
                       }
   
                       // Execution stage
                       HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
                       batchDAO.flush(innerPrepareRequests)
                               .whenComplete(($1, $2) -> executeLatencyTimer.close());
                   }, prepareExecutorService);
               }).toArray(CompletableFuture[]::new));
   ```
   batchDAO is only one, but works have many and each work will invoke  batchDAO.flush in different threads, and batchDAO only have one BulkProcessor.   it means:
   if (BulkProcessor == null ) can invoked many time。
   batchDAO have only one BulkProcessor. 
   Multiple scheduled tasks are initialized, but in reality, only one is useful. For the rest, ArrayBlockingQueue<Holder> requests  always  0 size.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org