You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2022/04/18 09:10:49 UTC
[GitHub] [skywalking] yangyiweigege commented on issue #8897: [Bug] ES BulkProcessor in BatchProcessEsDAO Initialize multiple times
yangyiweigege commented on issue #8897:
URL: https://github.com/apache/skywalking/issues/8897#issuecomment-1101245769
code in org.apache.skywalking.oap.server.core.storage.PersistenceTimer#extractDataAndSave:
```java
final CompletableFuture<Void> future =
CompletableFuture.allOf(workers.stream().map(worker -> {
return CompletableFuture.runAsync(() -> {
List<PrepareRequest> innerPrepareRequests;
// Prepare stage
try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) {
if (log.isDebugEnabled()) {
log.debug(
"extract {} worker data and save",
worker.getClass().getName()
);
}
innerPrepareRequests = worker.buildBatchRequests();
worker.endOfRound();
}
if (CollectionUtils.isEmpty(innerPrepareRequests)) {
return;
}
// Execution stage
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
batchDAO.flush(innerPrepareRequests)
.whenComplete(($1, $2) -> executeLatencyTimer.close());
}, prepareExecutorService);
}).toArray(CompletableFuture[]::new));
```
batchDAO is only one, but works have many and each work will invoke batchDAO.flush in different threads, and batchDAO only have one BulkProcessor. it means:
if (BulkProcessor == null ) can invoked many time。
batchDAO have only one BulkProcessor.
Multiple scheduled tasks are initialized, but in reality, only one is useful. For the rest, ArrayBlockingQueue<Holder> requests always 0 size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org