You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/20 05:06:50 UTC
[skywalking] branch master updated: Fix bulk processor initialize multiple times (#8913)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d3f484e77c Fix bulk processor initialize multiple times (#8913)
d3f484e77c is described below
commit d3f484e77c3613514638f585ecd27ab4dba2791f
Author: yangyiweigege <28...@qq.com>
AuthorDate: Wed Apr 20 13:06:35 2022 +0800
Fix bulk processor initialize multiple times (#8913)
---
docs/en/changes/changes.md | 1 +
.../plugin/elasticsearch/base/BatchProcessEsDAO.java | 14 +++++++++++---
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 11847baff3..0a5e3f6387 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -21,6 +21,7 @@
* Webapp module (for UI) enabled compression.
* [Breaking Change] Add layer field to event, report an event without layer is not allowed.
* Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
+* Fix ES BulkProcessor in BatchProcessEsDAO was initialized multiple times and created multiple ES flush schedule tasks.
#### UI
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index ef8ec0062f..39b6a81fc6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@Slf4j
public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
- private BulkProcessor bulkProcessor;
+ private volatile BulkProcessor bulkProcessor;
private final int bulkActions;
private final int flushInterval;
private final int concurrentRequests;
@@ -50,7 +50,11 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
@Override
public void insert(InsertRequest insertRequest) {
if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ synchronized (this) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ }
+ }
}
this.bulkProcessor.add(((IndexRequestWrapper) insertRequest).getRequest());
@@ -59,7 +63,11 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
@Override
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
if (bulkProcessor == null) {
- this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ synchronized (this) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+ }
+ }
}
if (CollectionUtils.isNotEmpty(prepareRequests)) {