You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:27 UTC

[skywalking] 01/25: start from new code base and copy all previous stream impl

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit bb953530e682346c1469117178bff71e5a397cb5
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sun Nov 28 23:08:31 2021 +0800

    start from new code base and copy all previous stream impl
---
 oap-server/server-storage-plugin/pom.xml           |   1 +
 .../{ => storage-banyandb-plugin}/pom.xml          |  40 +++--
 .../storage/plugin/banyandb/BanyanDBSchema.java    |  66 ++++++++
 .../plugin/banyandb/BanyanDBStorageClient.java     |  55 ++++++
 .../plugin/banyandb/BanyanDBStorageConfig.java     |  47 ++++++
 .../plugin/banyandb/BanyanDBStorageProvider.java   |  89 ++++++++++
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |  19 +++
 .../plugin/banyandb/stream/BanyanDBBatchDAO.java   |  75 +++++++++
 .../stream/BanyanDBBrowserLogQueryDAO.java         |  17 ++
 .../banyandb/stream/BanyanDBEventQueryDAO.java     |  23 +++
 .../banyandb/stream/BanyanDBLogQueryDAO.java       |  20 +++
 .../banyandb/stream/BanyanDBMetadataQueryDAO.java  |  55 ++++++
 .../stream/BanyanDBNetworkAddressAliasDAO.java     |  17 ++
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  18 ++
 .../stream/BanyanDBProfileTaskQueryDAO.java        |  23 +++
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |  40 +++++
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |  79 +++++++++
 .../stream/BanyanDBSegmentRecordBuilder.java       |  58 +++++++
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |  85 ++++++++++
 .../stream/BanyanDBStreamInsertRequest.java        |  31 ++++
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     | 186 +++++++++++++++++++++
 .../stream/BanyanDBUITemplateManagementDAO.java    |  35 ++++
 ...alking.oap.server.library.module.ModuleProvider |  19 +++
 23 files changed, 1086 insertions(+), 12 deletions(-)

diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/pom.xml
index fc7f896467..82e3353f4f 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/pom.xml
@@ -34,5 +34,6 @@
         <module>storage-influxdb-plugin</module>
         <module>storage-tidb-plugin</module>
         <module>storage-iotdb-plugin</module>
+        <module>storage-banyandb-plugin</module>
     </modules>
 </project>
diff --git a/oap-server/server-storage-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
similarity index 53%
copy from oap-server/server-storage-plugin/pom.xml
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
index fc7f896467..76848c1b4b 100644
--- a/oap-server/server-storage-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
@@ -19,20 +19,36 @@
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>oap-server</artifactId>
+        <artifactId>server-storage-plugin</artifactId>
         <groupId>org.apache.skywalking</groupId>
-        <version>9.1.0-SNAPSHOT</version>
+        <version>8.9.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>server-storage-plugin</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>storage-jdbc-hikaricp-plugin</module>
-        <module>storage-elasticsearch-plugin</module>
-        <module>storage-zipkin-elasticsearch-plugin</module>
-        <module>storage-influxdb-plugin</module>
-        <module>storage-tidb-plugin</module>
-        <module>storage-iotdb-plugin</module>
-    </modules>
+    <artifactId>storage-banyandb-plugin</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>server-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>library-datacarrier-queue</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>banyandb-java-client</artifactId>
+            <version>0.1.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
 </project>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java
new file mode 100644
index 0000000000..3ebf87776b
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBSchema.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import lombok.Getter;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class BanyanDBSchema {
+    public static final String NAME = "sw";
+    public static final String GROUP = "default";
+    public static final List<String> FIELD_NAMES;
+
+    public static final Set<String> INDEX_FIELDS = ImmutableSet.of("http.method", "status_code", "db.type",
+            "db.instance", "mq.queue", "mq.topic", "mq.broker");
+
+    static {
+        Set<String> fields = new LinkedHashSet<>();
+        fields.add("trace_id");
+        fields.add("state");
+        fields.add("service_id");
+        fields.add("service_instance_id");
+        fields.add("endpoint_id");
+        fields.add("duration");
+        fields.add("start_time");
+        fields.add("http.method");
+        fields.add("status_code");
+        fields.add("db.type");
+        fields.add("db.instance");
+        fields.add("mq.queue");
+        fields.add("mq.topic");
+        fields.add("mq.broker");
+        FIELD_NAMES = ImmutableList.copyOf(fields);
+    }
+
+    public enum TraceState {
+        ALL(0), SUCCESS(1), ERROR(2);
+
+        @Getter
+        private final int state;
+
+        TraceState(int state) {
+            this.state = state;
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
new file mode 100644
index 0000000000..eed4386e57
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -0,0 +1,55 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
+import org.apache.skywalking.oap.server.library.util.HealthChecker;
+
+import java.io.IOException;
+
+/**
+ * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
+ * which implement {@link Client} and {@link HealthCheckable}.
+ */
+public class BanyanDBStorageClient implements Client, HealthCheckable {
+    private final BanyanDBClient client;
+    private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
+
+    public BanyanDBStorageClient(String host, int port, String group) {
+        this.client = new BanyanDBClient(host, port, group);
+    }
+
+    @Override
+    public void connect() throws Exception {
+        this.client.connect();
+    }
+
+    @Override
+    public void shutdown() throws IOException {
+        this.client.close();
+    }
+
+    public StreamQueryResponse query(StreamQuery streamQuery) {
+        try {
+            StreamQueryResponse response = this.client.queryStreams(streamQuery);
+            this.healthChecker.health();
+            return response;
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
+    }
+
+    public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
+        return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
+    }
+
+    @Override
+    public void registerChecker(HealthChecker healthChecker) {
+        this.healthChecker.register(healthChecker);
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
new file mode 100644
index 0000000000..c37aed1162
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Getter
+@Setter
+public class BanyanDBStorageConfig extends ModuleConfig {
+    private String host = "127.0.0.1";
+    private int port = 17912;
+    /**
+     * Group of the schema in BanyanDB
+     */
+    private String group = "default";
+
+    /**
+     * The maximum size of write entities in a single batch write call.
+     */
+    private int maxBulkSize = 5000;
+    /**
+     * Period of flush interval. In the timeunit of seconds.
+     */
+    private int flushInterval = 15;
+    /**
+     * Concurrent consumer threads for batch writing.
+     */
+    private int concurrentWriteThreads = 2;
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
new file mode 100644
index 0000000000..c4aa1016ca
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
+
+public class BanyanDBStorageProvider extends ModuleProvider {
+    private BanyanDBStorageConfig config;
+    private BanyanDBStorageClient client;
+
+    public BanyanDBStorageProvider() {
+        this.config = new BanyanDBStorageConfig();
+    }
+
+    @Override
+    public String name() {
+        return "banyandb";
+    }
+
+    @Override
+    public Class<? extends ModuleDefine> module() {
+        return StorageModule.class;
+    }
+
+    @Override
+    public ModuleConfig createConfigBeanIfAbsent() {
+        return config;
+    }
+
+    @Override
+    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+
+        this.client = new BanyanDBStorageClient(config.getHost(), config.getPort(), config.getGroup());
+    }
+
+    @Override
+    public void start() throws ServiceNotProvidedException, ModuleStartException {
+        final ConfigService configService = getManager().find(CoreModule.NAME)
+                .provider()
+                .getService(ConfigService.class);
+
+        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
+                .provider()
+                .getService(MetricsCreator.class);
+        HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
+                "storage_banyandb", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        this.client.registerChecker(healthChecker);
+        try {
+            this.client.connect();
+        } catch (Exception e) {
+            throw new ModuleStartException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+
+    }
+
+    @Override
+    public String[] requiredModules() {
+        return new String[]{CoreModule.NAME};
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
new file mode 100644
index 0000000000..c7bc9cc13d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -0,0 +1,19 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
+ * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
+ */
+public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO {
+    @Override
+    public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
+        return new Alarms();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
new file mode 100644
index 0000000000..9404996887
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
+    private StreamBulkWriteProcessor bulkProcessor;
+
+    private final int maxBulkSize;
+    private final int flushInterval;
+    private final int concurrency;
+
+    public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int flushInterval, int concurrency) {
+        super(client);
+        this.maxBulkSize = maxBulkSize;
+        this.flushInterval = flushInterval;
+        this.concurrency = concurrency;
+    }
+
+    @Override
+    public void insert(InsertRequest insertRequest) {
+        if (bulkProcessor == null) {
+            this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency);
+        }
+
+        if (insertRequest instanceof BanyanDBStreamInsertRequest) {
+            this.bulkProcessor.add(((BanyanDBStreamInsertRequest) insertRequest).getStreamWrite());
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
+        if (bulkProcessor == null) {
+            this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, flushInterval, concurrency);
+        }
+
+        if (CollectionUtils.isNotEmpty(prepareRequests)) {
+            return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
+                if (prepareRequest instanceof InsertRequest) {
+                    // TODO: return CompletableFuture<Void>
+                    this.bulkProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite());
+                }
+                return CompletableFuture.completedFuture(null);
+            }).toArray(CompletableFuture[]::new));
+        }
+
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
new file mode 100644
index 0000000000..7607668eba
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -0,0 +1,17 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+
+import java.io.IOException;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
+ */
+public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
+    @Override
+    public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
+        return new BrowserErrorLogs();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
new file mode 100644
index 0000000000..5ab67d95c6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -0,0 +1,23 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+
+import java.util.List;
+
+/**
+ * ???
+ * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
+ */
+public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+    @Override
+    public Events queryEvents(EventQueryCondition condition) throws Exception {
+        return new Events();
+    }
+
+    @Override
+    public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
+        return new Events();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
new file mode 100644
index 0000000000..baba93b886
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.Logs;
+import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
+ */
+public class BanyanDBLogQueryDAO implements ILogQueryDAO {
+    @Override
+    public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException {
+        return new Logs();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
new file mode 100644
index 0000000000..678b981f16
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -0,0 +1,55 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic}
+ * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
+ * are all streams.
+ */
+public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+    @Override
+    public List<Service> getAllServices(String group) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Service> getAllBrowserServices() throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Database> getAllDatabases() throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
+        return null;
+    }
+
+    @Override
+    public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
+        return null;
+    }
+
+    @Override
+    public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
+        return Collections.emptyList();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
new file mode 100644
index 0000000000..10675a49a1
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -0,0 +1,17 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link NetworkAddressAlias} is a stream
+ */
+public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+    @Override
+    public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+        return Collections.emptyList();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
new file mode 100644
index 0000000000..2b6290e2d2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -0,0 +1,18 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
+ */
+public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+    @Override
+    public List<ProfileTaskLog> getTaskLogList() throws IOException {
+        return Collections.emptyList();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
new file mode 100644
index 0000000000..c99d267cbe
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -0,0 +1,23 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
+ */
+public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+    @Override
+    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ProfileTask getById(String id) throws IOException {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
new file mode 100644
index 0000000000..8a27e885fe
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -0,0 +1,40 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link ProfileThreadSnapshotRecord} is a stream
+ */
+public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
+    @Override
+    public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public int queryMinSequence(String segmentId, long start, long end) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
new file mode 100644
index 0000000000..f59bc25f9e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.Banyandb;
+import org.apache.skywalking.banyandb.v1.client.SerializableTag;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class BanyanDBRecordDAO implements IRecordDAO {
+    private final StorageHashMapBuilder<Record> storageBuilder;
+
+    @Override
+    public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
+        if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+            SegmentRecord segmentRecord = (SegmentRecord) record;
+            StreamWrite streamWrite = StreamWrite.builder()
+                    .name(BanyanDBSchema.NAME)
+                    .binary(segmentRecord.getDataBinary())
+                    .timestamp(segmentRecord.getStartTime())
+                    .elementId(segmentRecord.getSegmentId())
+                    .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
+                    .build();
+            return new BanyanDBStreamInsertRequest(streamWrite);
+        }
+        // TODO: support other stream types
+        return new InsertRequest() {
+        };
+    }
+
+    /**
+     * Convert storageEntity in Map to an ordered list of {@link SerializableTag}
+     *
+     * @param segmentRecordMap which comes from {@link SegmentRecord}
+     * @return an ordered list of {@link SerializableTag} which is accepted by BanyanDB Client
+     */
+    static List<SerializableTag<Banyandb.TagValue>> buildFieldObjects(Map<String, Object> segmentRecordMap) {
+        List<SerializableTag<Banyandb.TagValue>> tagList = new ArrayList<>(BanyanDBSchema.FIELD_NAMES.size());
+        for (String fieldName : BanyanDBSchema.FIELD_NAMES) {
+            Object val = segmentRecordMap.get(fieldName);
+            if (val == null) {
+                tagList.add(Tag.nullField());
+            } else {
+                tagList.add((SerializableTag<Banyandb.TagValue>) val);
+            }
+        }
+        return tagList;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java
new file mode 100644
index 0000000000..b8c4744380
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSegmentRecordBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BanyanDBSegmentRecordBuilder implements StorageHashMapBuilder<Record> {
+    @Override
+    public SegmentRecord storage2Entity(Map<String, Object> dbMap) {
+        return new SegmentRecord();
+    }
+
+    /**
+     * Map SegmentRecord to Skywalking-BanyanDB compatible Map with indexed tags and
+     * without binaryData, entityId
+     */
+    @Override
+    public Map<String, Object> entity2Storage(Record record) {
+        final SegmentRecord segmentRecord = (SegmentRecord) record;
+        Map<String, Object> map = new HashMap<>();
+        map.put(SegmentRecord.TRACE_ID, TagAndValue.stringField(segmentRecord.getTraceId()));
+        map.put(SegmentRecord.SERVICE_ID, TagAndValue.stringField(segmentRecord.getServiceId()));
+        map.put(SegmentRecord.SERVICE_INSTANCE_ID, TagAndValue.stringField(segmentRecord.getServiceInstanceId()));
+        map.put(SegmentRecord.ENDPOINT_ID, TagAndValue.stringField(segmentRecord.getEndpointId()));
+        map.put(SegmentRecord.START_TIME, TagAndValue.longField(segmentRecord.getStartTime()));
+        map.put("duration", TagAndValue.longField(segmentRecord.getLatency()));
+        map.put("state", TagAndValue.longField(segmentRecord.getIsError()));
+        if (segmentRecord.getTagsRawData() != null) {
+            for (final Tag tag : segmentRecord.getTagsRawData()) {
+                map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
+            }
+        }
+        return map;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
new file mode 100644
index 0000000000..f5ee5fb255
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class BanyanDBStorageDAO implements StorageDAO {
+    @Override
+    public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
+        return new IMetricsDAO() {
+            @Override
+            public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+                return new InsertRequest() {
+                };
+            }
+
+            @Override
+            public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+                return new UpdateRequest() {
+                };
+            }
+        };
+    }
+
+    @Override
+    public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
+        try {
+            if (SegmentRecord.class.equals(storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType())) {
+                return new BanyanDBRecordDAO(new BanyanDBSegmentRecordBuilder());
+            } else {
+                return (model, record) -> new InsertRequest() {
+                };
+            }
+        } catch (NoSuchMethodException noSuchMethodException) {
+            log.error("cannot find method storage2Entity", noSuchMethodException);
+            throw new RuntimeException("cannot find method storage2Entity");
+        }
+    }
+
+    @Override
+    public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
+        return (model, noneStream) -> {
+        };
+    }
+
+    @Override
+    public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
+        return (model, storageData) -> {
+        };
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
new file mode 100644
index 0000000000..aef8ec94e7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+
+@RequiredArgsConstructor
+public class BanyanDBStreamInsertRequest implements InsertRequest {
+
+    @Getter
+    private final StreamWrite streamWrite;
+}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
new file mode 100644
index 0000000000..9fc403afc6
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.*;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.query.type.*;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+    private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
+
+    private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
+    private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
+
+    public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
+    @Override
+    public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
+        StreamQuery query;
+        if (startSecondTB != 0 && endSecondTB != 0) {
+            query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ);
+        } else {
+            query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ);
+        }
+        if (minDuration != 0) {
+            // duration >= minDuration
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+        }
+        if (maxDuration != 0) {
+            // duration <= maxDuration
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+        }
+
+        if (!Strings.isNullOrEmpty(serviceId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+        }
+
+        if (!Strings.isNullOrEmpty(serviceInstanceId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+        }
+
+        if (!Strings.isNullOrEmpty(endpointId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+        }
+
+        switch (traceState) {
+            case ERROR:
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
+                break;
+            case SUCCESS:
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
+                break;
+            default:
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
+                break;
+        }
+
+        switch (queryOrder) {
+            case BY_START_TIME:
+                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+                break;
+            case BY_DURATION:
+                query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+                break;
+        }
+
+        if (CollectionUtils.isNotEmpty(tags)) {
+            for (final Tag tag : tags) {
+                if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                }
+            }
+        }
+
+        query.setLimit(limit);
+        query.setOffset(from);
+
+        // build request
+        StreamQueryResponse response = this.getClient().query(query);
+        TraceBrief brief = new TraceBrief();
+        brief.setTotal(response.size());
+        brief.getTraces().addAll(response.getElements().stream().map(elem -> {
+            BasicTrace trace = new BasicTrace();
+            trace.setSegmentId(elem.getId());
+            final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
+            trace.getTraceIds().add((String) searchable.get(0).getValue());
+            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                    (String) searchable.get(2).getValue()
+            ).getEndpointName());
+            trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+            trace.setStart(String.valueOf(searchable.get(4).getValue()));
+            return trace;
+        }).collect(Collectors.toList()));
+        return brief;
+    }
+
+    @Override
+    public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
+        StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ);
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+        query.setDataBinary(true);
+        StreamQueryResponse response = this.getClient().query(query);
+        return response.getElements().stream().map(elem -> {
+            SegmentRecord record = new SegmentRecord();
+            final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
+            record.setSegmentId(elem.getId());
+            record.setTraceId((String) searchable.get(0).getValue());
+            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+            record.setServiceId((String) searchable.get(2).getValue());
+            record.setServiceInstanceId((String) searchable.get(3).getValue());
+            record.setEndpointId((String) searchable.get(4).getValue());
+            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+            final List<TagAndValue<?>> data = elem.getTagFamilies().get(1);
+            // TODO: support binary data in the client SDK
+            record.setDataBinary((byte[]) data.get(0).getValue());
+            return record;
+        }).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    static long parseMillisFromStartSecondTB(long startSecondTB) {
+        return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB));
+    }
+
+    static long parseMillisFromEndSecondTB(long endSecondTB) {
+        long t = endSecondTB;
+        long second = t % 100;
+        if (second > 59) {
+            second = 0;
+        }
+        t = t / 100;
+        long minute = t % 100;
+        if (minute > 59) {
+            minute = 0;
+        }
+        t = t / 100;
+        long hour = t % 100;
+        if (hour > 23) {
+            hour = 0;
+        }
+        t = t / 100;
+        return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC)
+                .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second));
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
new file mode 100644
index 0000000000..ad8fed926e
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -0,0 +1,35 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
+ */
+public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+    @Override
+    public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
+        return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build();
+    }
+
+    @Override
+    public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
+        return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+    }
+
+    @Override
+    public TemplateChangeStatus disableTemplate(String name) throws IOException {
+        return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000000..6b9674593d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageProvider