You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/07/20 07:23:35 UTC
[skywalking] branch master updated: Fixes Banynadb race condition (#9360)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new fd6d0ee743 Fixes Banynadb race condition (#9360)
fd6d0ee743 is described below
commit fd6d0ee7439bbeaad8d03a095dd2774bf1f01fbd
Author: Brandon Fergerson <bf...@apache.org>
AuthorDate: Wed Jul 20 11:23:23 2022 +0400
Fixes Banynadb race condition (#9360)
---
docs/en/changes/changes.md | 1 +
.../storage/plugin/banyandb/BanyanDBBatchDAO.java | 46 ++++++++++++++--------
2 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index ef5a6e5a1d..b13d97360e 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -25,6 +25,7 @@
* Support the `NETWORK` type of eBPF Profiling task.
* Support `sumHistogram` in `MAL`.
* [Breaking Change] Make the eBPF Profiling task support to the service instance level, index/table `ebpf_profiling_task` is required to be re-created when bump up from previous releases.
+* Fix race condition in Banyandb storage
#### UI
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index fa9e161b17..0e6d761aa9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -31,9 +31,10 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBS
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
+ private static final Object STREAM_SYNCHRONIZER = new Object();
+ private static final Object MEASURE_SYNCHRONIZER = new Object();
private StreamBulkWriteProcessor streamBulkWriteProcessor;
private MeasureBulkWriteProcessor measureBulkWriteProcessor;
@@ -44,8 +45,6 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
private final int concurrency;
- private final AtomicBoolean initialized = new AtomicBoolean(false);
-
public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) {
super(client);
this.maxBulkSize = maxBulkSize;
@@ -55,37 +54,50 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
@Override
public void insert(InsertRequest insertRequest) {
- if (initialized.compareAndSet(false, true)) {
- this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
- this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
- }
if (insertRequest instanceof BanyanDBStreamInsertRequest) {
- this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
+ getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
} else if (insertRequest instanceof BanyanDBMeasureInsertRequest) {
- this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) insertRequest).getMeasureWrite());
+ getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) insertRequest).getMeasureWrite());
}
}
@Override
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
- if (initialized.compareAndSet(false, true)) {
- this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
- this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
- }
-
if (CollectionUtils.isNotEmpty(prepareRequests)) {
for (final PrepareRequest r : prepareRequests) {
if (r instanceof BanyanDBStreamInsertRequest) {
// TODO: return CompletableFuture<Void>
- this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
+ getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
} else if (r instanceof BanyanDBMeasureInsertRequest) {
- this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
+ getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
} else if (r instanceof BanyanDBMeasureUpdateRequest) {
- this.measureBulkWriteProcessor.add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
+ getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
}
}
}
return CompletableFuture.completedFuture(null);
}
+
+ private StreamBulkWriteProcessor getStreamBulkWriteProcessor() {
+ if (streamBulkWriteProcessor == null) {
+ synchronized (STREAM_SYNCHRONIZER) {
+ if (streamBulkWriteProcessor == null) {
+ this.streamBulkWriteProcessor = getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ }
+ }
+ }
+ return streamBulkWriteProcessor;
+ }
+
+ private MeasureBulkWriteProcessor getMeasureBulkWriteProcessor() {
+ if (measureBulkWriteProcessor == null) {
+ synchronized (MEASURE_SYNCHRONIZER) {
+ if (measureBulkWriteProcessor == null) {
+ this.measureBulkWriteProcessor = getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ }
+ }
+ }
+ return measureBulkWriteProcessor;
+ }
}