You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/20 07:23:35 UTC

[skywalking] branch master updated: Fixes Banynadb race condition (#9360)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new fd6d0ee743 Fixes Banynadb race condition (#9360)
fd6d0ee743 is described below

commit fd6d0ee7439bbeaad8d03a095dd2774bf1f01fbd
Author: Brandon Fergerson <bf...@apache.org>
AuthorDate: Wed Jul 20 11:23:23 2022 +0400

    Fixes Banynadb race condition (#9360)
---
 docs/en/changes/changes.md                         |  1 +
 .../storage/plugin/banyandb/BanyanDBBatchDAO.java  | 46 ++++++++++++++--------
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index ef5a6e5a1d..b13d97360e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -25,6 +25,7 @@
 * Support the `NETWORK` type of eBPF Profiling task.
 * Support `sumHistogram` in `MAL`.
 * [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
+* Fix race condition in Banyandb storage
 
 #### UI
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index fa9e161b17..0e6d761aa9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -31,9 +31,10 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBS
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
+    private static final Object STREAM_SYNCHRONIZER = new Object();
+    private static final Object MEASURE_SYNCHRONIZER = new Object();
     private StreamBulkWriteProcessor streamBulkWriteProcessor;
 
     private MeasureBulkWriteProcessor measureBulkWriteProcessor;
@@ -44,8 +45,6 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
 
     private final int concurrency;
 
-    private final AtomicBoolean initialized = new AtomicBoolean(false);
-
     public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) {
         super(client);
         this.maxBulkSize = maxBulkSize;
@@ -55,37 +54,50 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
 
     @Override
     public void insert(InsertRequest insertRequest) {
-        if (initialized.compareAndSet(false, true)) {
-            this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
-            this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
-        }
         if (insertRequest instanceof BanyanDBStreamInsertRequest) {
-            this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
+            getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
         } else if (insertRequest instanceof BanyanDBMeasureInsertRequest) {
-            this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) insertRequest).getMeasureWrite());
+            getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) insertRequest).getMeasureWrite());
         }
     }
 
     @Override
     public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
-        if (initialized.compareAndSet(false, true)) {
-            this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
-            this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
-        }
-
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
             for (final PrepareRequest r : prepareRequests) {
                 if (r instanceof BanyanDBStreamInsertRequest) {
                     // TODO: return CompletableFuture<Void>
-                    this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
+                    getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
                 } else if (r instanceof BanyanDBMeasureInsertRequest) {
-                    this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
+                    getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
                 } else if (r instanceof BanyanDBMeasureUpdateRequest) {
-                    this.measureBulkWriteProcessor.add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
+                    getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
                 }
             }
         }
 
         return CompletableFuture.completedFuture(null);
     }
+
+    private StreamBulkWriteProcessor getStreamBulkWriteProcessor() {
+        if (streamBulkWriteProcessor == null) {
+            synchronized (STREAM_SYNCHRONIZER) {
+                if (streamBulkWriteProcessor == null) {
+                    this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+                }
+            }
+        }
+        return streamBulkWriteProcessor;
+    }
+
+    private MeasureBulkWriteProcessor getMeasureBulkWriteProcessor() {
+        if (measureBulkWriteProcessor == null) {
+            synchronized (MEASURE_SYNCHRONIZER) {
+                if (measureBulkWriteProcessor == null) {
+                    this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
+                }
+            }
+        }
+        return measureBulkWriteProcessor;
+    }
 }