You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:27 UTC
[skywalking] 01/25: start from new code base and copy all previous stream impl
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit bb953530e682346c1469117178bff71e5a397cb5
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sun Nov 28 23:08:31 2021 +0800
start from new code base and copy all previous stream impl
---
oap-server/server-storage-plugin/pom.xml | 1 +
.../{ => storage-banyandb-plugin}/pom.xml | 40 +++--
.../storage/plugin/banyandb/BanyanDBSchema.java | 66 ++++++++
.../plugin/banyandb/BanyanDBStorageClient.java | 55 ++++++
.../plugin/banyandb/BanyanDBStorageConfig.java | 47 ++++++
.../plugin/banyandb/BanyanDBStorageProvider.java | 89 ++++++++++
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 19 +++
.../plugin/banyandb/stream/BanyanDBBatchDAO.java | 75 +++++++++
.../stream/BanyanDBBrowserLogQueryDAO.java | 17 ++
.../banyandb/stream/BanyanDBEventQueryDAO.java | 23 +++
.../banyandb/stream/BanyanDBLogQueryDAO.java | 20 +++
.../banyandb/stream/BanyanDBMetadataQueryDAO.java | 55 ++++++
.../stream/BanyanDBNetworkAddressAliasDAO.java | 17 ++
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 18 ++
.../stream/BanyanDBProfileTaskQueryDAO.java | 23 +++
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 40 +++++
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 79 +++++++++
.../stream/BanyanDBSegmentRecordBuilder.java | 58 +++++++
.../plugin/banyandb/stream/BanyanDBStorageDAO.java | 85 ++++++++++
.../stream/BanyanDBStreamInsertRequest.java | 31 ++++
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 186 +++++++++++++++++++++
.../stream/BanyanDBUITemplateManagementDAO.java | 35 ++++
...alking.oap.server.library.module.ModuleProvider | 19 +++
23 files changed, 1086 insertions(+), 12 deletions(-)
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index fc7f896467..82e3353f4f 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -34,5 +34,6 @@
<module>storage-influxdb-plugin</module>
<module>storage-tidb-plugin</module>
<module>storage-iotdb-plugin</module>
+ <module>storage-banyandb-plugin</module>
</modules>
</project>
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
similarity index 53%
copy from oap-server/server-storage-plugin/pom.xml
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
index fc7f896467..76848c1b4b 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
@@ -19,20 +19,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>oap-server</artifactId>
+ <artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
- <version>9.1.0-SNAPSHOT</version>
+ <version>8.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>server-storage-plugin</artifactId>
- <packaging>pom</packaging>
- <modules>
- <module>storage-jdbc-hikaricp-plugin</module>
- <module>storage-elasticsearch-plugin</module>
- <module>storage-zipkin-elasticsearch-plugin</module>
- <module>storage-influxdb-plugin</module>
- <module>storage-tidb-plugin</module>
- <module>storage-iotdb-plugin</module>
- </modules>
+ <artifactId>storage-banyandb-plugin</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>library-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>library-datacarrier-queue</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>banyandb-java-client</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
</project>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java
new file mode 100644
index 0000000000..3ebf87776b
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import lombok.Getter;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class BanyanDBSchema {
+ public static final String NAME = "sw";
+ public static final String GROUP = "default";
+ public static final List<String> FIELD_NAMES;
+
+ public static final Set<String> INDEX_FIELDS = ImmutableSet.of("http.method", "status_code", "db.type",
+ "db.instance", "mq.queue", "mq.topic", "mq.broker");
+
+ static {
+ Set<String> fields = new LinkedHashSet<>();
+ fields.add("trace_id");
+ fields.add("state");
+ fields.add("service_id");
+ fields.add("service_instance_id");
+ fields.add("endpoint_id");
+ fields.add("duration");
+ fields.add("start_time");
+ fields.add("http.method");
+ fields.add("status_code");
+ fields.add("db.type");
+ fields.add("db.instance");
+ fields.add("mq.queue");
+ fields.add("mq.topic");
+ fields.add("mq.broker");
+ FIELD_NAMES = ImmutableList.copyOf(fields);
+ }
+
+ public enum TraceState {
+ ALL(0), SUCCESS(1), ERROR(2);
+
+ @Getter
+ private final int state;
+
+ TraceState(int state) {
+ this.state = state;
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
new file mode 100644
index 0000000000..eed4386e57
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -0,0 +1,55 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
+import org.apache.skywalking.oap.server.library.util.HealthChecker;
+
+import java.io.IOException;
+
+/**
+ * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
+ * which implement {@link Client} and {@link HealthCheckable}.
+ */
+public class BanyanDBStorageClient implements Client, HealthCheckable {
+ private final BanyanDBClient client;
+ private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
+
+ public BanyanDBStorageClient(String host, int port, String group) {
+ this.client = new BanyanDBClient(host, port, group);
+ }
+
+ @Override
+ public void connect() throws Exception {
+ this.client.connect();
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ this.client.close();
+ }
+
+ public StreamQueryResponse query(StreamQuery streamQuery) {
+ try {
+ StreamQueryResponse response = this.client.queryStreams(streamQuery);
+ this.healthChecker.health();
+ return response;
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
+ }
+
+ public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+ return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
+ }
+
+ @Override
+ public void registerChecker(HealthChecker healthChecker) {
+ this.healthChecker.register(healthChecker);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
new file mode 100644
index 0000000000..c37aed1162
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Getter
+@Setter
+public class BanyanDBStorageConfig extends ModuleConfig {
+ private String host = "127.0.0.1";
+ private int port = 17912;
+ /**
+ * Group of the schema in BanyanDB
+ */
+ private String group = "default";
+
+ /**
+ * The maximum size of write entities in a single batch write call.
+ */
+ private int maxBulkSize = 5000;
+ /**
+ * Period of flush interval. In the timeunit of seconds.
+ */
+ private int flushInterval = 15;
+ /**
+ * Concurrent consumer threads for batch writing.
+ */
+ private int concurrentWriteThreads = 2;
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
new file mode 100644
index 0000000000..c4aa1016ca
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+public class BanyanDBStorageProvider extends ModuleProvider {
+ private BanyanDBStorageConfig config;
+ private BanyanDBStorageClient client;
+
+ public BanyanDBStorageProvider() {
+ this.config = new BanyanDBStorageConfig();
+ }
+
+ @Override
+ public String name() {
+ return "banyandb";
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return StorageModule.class;
+ }
+
+ @Override
+ public ModuleConfig createConfigBeanIfAbsent() {
+ return config;
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
+ this.client = new BanyanDBStorageClient(config.getHost(), config.getPort(), config.getGroup());
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ final ConfigService configService = getManager().find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
+
+ MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+ .provider()
+ .getService(MetricsCreator.class);
+ HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
+ "storage_banyandb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ this.client.registerChecker(healthChecker);
+ try {
+ this.client.connect();
+ } catch (Exception e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[]{CoreModule.NAME};
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
new file mode 100644
index 0000000000..c7bc9cc13d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -0,0 +1,19 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
+ * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
+ */
+public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO {
+ @Override
+ public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
+ return new Alarms();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
new file mode 100644
index 0000000000..9404996887
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
+ private StreamBulkWriteProcessor bulkProcessor;
+
+ private final int maxBulkSize;
+ private final int flushInterval;
+ private final int concurrency;
+
+ public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) {
+ super(client);
+ this.maxBulkSize = maxBulkSize;
+ this.flushInterval = flushInterval;
+ this.concurrency = concurrency;
+ }
+
+ @Override
+ public void insert(InsertRequest insertRequest) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ }
+
+ if (insertRequest instanceof BanyanDBStreamInsertRequest) {
+ this.bulkProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency);
+ }
+
+ if (CollectionUtils.isNotEmpty(prepareRequests)) {
+ return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
+ if (prepareRequest instanceof InsertRequest) {
+ // TODO: return CompletableFuture<Void>
+ this.bulkProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite());
+ }
+ return CompletableFuture.completedFuture(null);
+ }).toArray(CompletableFuture[]::new));
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
new file mode 100644
index 0000000000..7607668eba
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -0,0 +1,17 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+
+import java.io.IOException;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
+ */
+public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
+ @Override
+ public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
+ return new BrowserErrorLogs();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
new file mode 100644
index 0000000000..5ab67d95c6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -0,0 +1,23 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+
+import java.util.List;
+
+/**
+ * ???
+ * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
+ */
+public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+ @Override
+ public Events queryEvents(EventQueryCondition condition) throws Exception {
+ return new Events();
+ }
+
+ @Override
+ public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
+ return new Events();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
new file mode 100644
index 0000000000..baba93b886
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.Logs;
+import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
+ */
+public class BanyanDBLogQueryDAO implements ILogQueryDAO {
+ @Override
+ public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException {
+ return new Logs();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
new file mode 100644
index 0000000000..678b981f16
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -0,0 +1,55 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic}
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
+ * are all streams.
+ */
+public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+ @Override
+ public List<Service> getAllServices(String group) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Service> getAllBrowserServices() throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Database> getAllDatabases() throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
new file mode 100644
index 0000000000..10675a49a1
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -0,0 +1,17 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link NetworkAddressAlias} is a stream
+ */
+public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+ @Override
+ public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
new file mode 100644
index 0000000000..2b6290e2d2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -0,0 +1,18 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
+ */
+public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+ @Override
+ public List<ProfileTaskLog> getTaskLogList() throws IOException {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
new file mode 100644
index 0000000000..c99d267cbe
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -0,0 +1,23 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
+ */
+public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+ @Override
+ public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
new file mode 100644
index 0000000000..8a27e885fe
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -0,0 +1,40 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link ProfileThreadSnapshotRecord} is a stream
+ */
+public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
+ @Override
+ public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public int queryMinSequence(String segmentId, long start, long end) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
new file mode 100644
index 0000000000..f59bc25f9e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.client.SerializableTag;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class BanyanDBRecordDAO implements IRecordDAO {
+ private final StorageHashMapBuilder<Record> storageBuilder;
+
+ @Override
+ public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ SegmentRecord segmentRecord = (SegmentRecord) record;
+ StreamWrite streamWrite = StreamWrite.builder()
+ .name(BanyanDBSchema.NAME)
+ .binary(segmentRecord.getDataBinary())
+ .timestamp(segmentRecord.getStartTime())
+ .elementId(segmentRecord.getSegmentId())
+ .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
+ .build();
+ return new BanyanDBStreamInsertRequest(streamWrite);
+ }
+ // TODO: support other stream types
+ return new InsertRequest() {
+ };
+ }
+
+ /**
+ * Convert storageEntity in Map to an ordered list of {@link SerializableTag}
+ *
+ * @param segmentRecordMap which comes from {@link SegmentRecord}
+ * @return an ordered list of {@link SerializableTag} which is accepted by BanyanDB Client
+ */
+ static List<SerializableTag<Banyandb.TagValue>> buildFieldObjects(Map<String, Object> segmentRecordMap) {
+ List<SerializableTag<Banyandb.TagValue>> tagList = new ArrayList<>(BanyanDBSchema.FIELD_NAMES.size());
+ for (String fieldName : BanyanDBSchema.FIELD_NAMES) {
+ Object val = segmentRecordMap.get(fieldName);
+ if (val == null) {
+ tagList.add(Tag.nullField());
+ } else {
+ tagList.add((SerializableTag<Banyandb.TagValue>) val);
+ }
+ }
+ return tagList;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java
new file mode 100644
index 0000000000..b8c4744380
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BanyanDBSegmentRecordBuilder implements StorageHashMapBuilder<Record> {
+ @Override
+ public SegmentRecord storage2Entity(Map<String, Object> dbMap) {
+ return new SegmentRecord();
+ }
+
+ /**
+ * Map SegmentRecord to Skywalking-BanyanDB compatible Map with indexed tags and
+ * without binaryData, entityId
+ */
+ @Override
+ public Map<String, Object> entity2Storage(Record record) {
+ final SegmentRecord segmentRecord = (SegmentRecord) record;
+ Map<String, Object> map = new HashMap<>();
+ map.put(SegmentRecord.TRACE_ID, TagAndValue.stringField(segmentRecord.getTraceId()));
+ map.put(SegmentRecord.SERVICE_ID, TagAndValue.stringField(segmentRecord.getServiceId()));
+ map.put(SegmentRecord.SERVICE_INSTANCE_ID, TagAndValue.stringField(segmentRecord.getServiceInstanceId()));
+ map.put(SegmentRecord.ENDPOINT_ID, TagAndValue.stringField(segmentRecord.getEndpointId()));
+ map.put(SegmentRecord.START_TIME, TagAndValue.longField(segmentRecord.getStartTime()));
+ map.put("duration", TagAndValue.longField(segmentRecord.getLatency()));
+ map.put("state", TagAndValue.longField(segmentRecord.getIsError()));
+ if (segmentRecord.getTagsRawData() != null) {
+ for (final Tag tag : segmentRecord.getTagsRawData()) {
+ map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
+ }
+ }
+ return map;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
new file mode 100644
index 0000000000..f5ee5fb255
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class BanyanDBStorageDAO implements StorageDAO {
+ @Override
+ public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
+ return new IMetricsDAO() {
+ @Override
+ public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+ return new InsertRequest() {
+ };
+ }
+
+ @Override
+ public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+ return new UpdateRequest() {
+ };
+ }
+ };
+ }
+
+ @Override
+ public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
+ try {
+ if (SegmentRecord.class.equals(storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType())) {
+ return new BanyanDBRecordDAO(new BanyanDBSegmentRecordBuilder());
+ } else {
+ return (model, record) -> new InsertRequest() {
+ };
+ }
+ } catch (NoSuchMethodException noSuchMethodException) {
+ log.error("cannot find method storage2Entity", noSuchMethodException);
+ throw new RuntimeException("cannot find method storage2Entity");
+ }
+ }
+
+ @Override
+ public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
+ return (model, noneStream) -> {
+ };
+ }
+
+ @Override
+ public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
+ return (model, storageData) -> {
+ };
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
new file mode 100644
index 0000000000..aef8ec94e7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+
+@RequiredArgsConstructor
+public class BanyanDBStreamInsertRequest implements InsertRequest {
+
+ @Getter
+ private final StreamWrite streamWrite;
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
new file mode 100644
index 0000000000..9fc403afc6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.*;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.query.type.*;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+ private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
+
+ private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
+ private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
+
+ public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
+ @Override
+ public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
+ StreamQuery query;
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ);
+ } else {
+ query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ);
+ }
+ if (minDuration != 0) {
+ // duration >= minDuration
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+ }
+ if (maxDuration != 0) {
+ // duration <= maxDuration
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+ }
+
+ if (!Strings.isNullOrEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+ }
+
+ if (!Strings.isNullOrEmpty(serviceInstanceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+ }
+
+ if (!Strings.isNullOrEmpty(endpointId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+ }
+
+ switch (traceState) {
+ case ERROR:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
+ break;
+ case SUCCESS:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
+ break;
+ default:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
+ break;
+ }
+
+ switch (queryOrder) {
+ case BY_START_TIME:
+ query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+ break;
+ case BY_DURATION:
+ query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+ break;
+ }
+
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ }
+ }
+ }
+
+ query.setLimit(limit);
+ query.setOffset(from);
+
+ // build request
+ StreamQueryResponse response = this.getClient().query(query);
+ TraceBrief brief = new TraceBrief();
+ brief.setTotal(response.size());
+ brief.getTraces().addAll(response.getElements().stream().map(elem -> {
+ BasicTrace trace = new BasicTrace();
+ trace.setSegmentId(elem.getId());
+ final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
+ trace.getTraceIds().add((String) searchable.get(0).getValue());
+ trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+ trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ (String) searchable.get(2).getValue()
+ ).getEndpointName());
+ trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+ trace.setStart(String.valueOf(searchable.get(4).getValue()));
+ return trace;
+ }).collect(Collectors.toList()));
+ return brief;
+ }
+
+ @Override
+ public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+ StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ);
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+ query.setDataBinary(true);
+ StreamQueryResponse response = this.getClient().query(query);
+ return response.getElements().stream().map(elem -> {
+ SegmentRecord record = new SegmentRecord();
+ final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
+ record.setSegmentId(elem.getId());
+ record.setTraceId((String) searchable.get(0).getValue());
+ record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+ record.setServiceId((String) searchable.get(2).getValue());
+ record.setServiceInstanceId((String) searchable.get(3).getValue());
+ record.setEndpointId((String) searchable.get(4).getValue());
+ record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+ record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ final List<TagAndValue<?>> data = elem.getTagFamilies().get(1);
+ // TODO: support binary data in the client SDK
+ record.setDataBinary((byte[]) data.get(0).getValue());
+ return record;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ static long parseMillisFromStartSecondTB(long startSecondTB) {
+ return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB));
+ }
+
+ static long parseMillisFromEndSecondTB(long endSecondTB) {
+ long t = endSecondTB;
+ long second = t % 100;
+ if (second > 59) {
+ second = 0;
+ }
+ t = t / 100;
+ long minute = t % 100;
+ if (minute > 59) {
+ minute = 0;
+ }
+ t = t / 100;
+ long hour = t % 100;
+ if (hour > 23) {
+ hour = 0;
+ }
+ t = t / 100;
+ return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC)
+ .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second));
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
new file mode 100644
index 0000000000..ad8fed926e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -0,0 +1,35 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
+ */
+public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+ @Override
+ public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
+ return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build();
+ }
+
+ @Override
+ public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
+ return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+ }
+
+ @Override
+ public TemplateChangeStatus disableTemplate(String name) throws IOException {
+ return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000000..6b9674593d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageProvider