You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/20 05:06:50 UTC

[skywalking] branch master updated: Fix bulk processor initialize multiple times (#8913)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f484e77c Fix bulk processor initialize multiple times (#8913)
d3f484e77c is described below

commit d3f484e77c3613514638f585ecd27ab4dba2791f
Author: yangyiweigege <28...@qq.com>
AuthorDate: Wed Apr 20 13:06:35 2022 +0800

    Fix bulk processor initialize multiple times (#8913)
---
 docs/en/changes/changes.md                                 |  1 +
 .../plugin/elasticsearch/base/BatchProcessEsDAO.java       | 14 +++++++++++---
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 11847baff3..0a5e3f6387 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -21,6 +21,7 @@
 * Webapp module (for UI) enabled compression.
 * [Breaking Change] Add layer field to event, report an event without layer is not allowed.
 * Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
+* Fix ES BulkProcessor in BatchProcessEsDAO was initialized multiple times and created multiple ES flush schedule tasks.
 
 #### UI
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index ef8ec0062f..39b6a81fc6 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 
 @Slf4j
 public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
-    private BulkProcessor bulkProcessor;
+    private volatile BulkProcessor bulkProcessor;
     private final int bulkActions;
     private final int flushInterval;
     private final int concurrentRequests;
@@ -50,7 +50,11 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
     @Override
     public void insert(InsertRequest insertRequest) {
         if (bulkProcessor == null) {
-            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+            synchronized (this) {
+                if (bulkProcessor == null) {
+                    this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+                }
+            }
         }
 
         this.bulkProcessor.add(((IndexRequestWrapper) insertRequest).getRequest());
@@ -59,7 +63,11 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
     @Override
     public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
         if (bulkProcessor == null) {
-            this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+            synchronized (this) {
+                if (bulkProcessor == null) {
+                    this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
+                }
+            }
         }
 
         if (CollectionUtils.isNotEmpty(prepareRequests)) {