You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:41 UTC

[skywalking] 20/22: remove all hardcoded builder

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit aff43b1be297526f609ce289a9d95946ecd3f21c
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Mar 21 23:39:23 2022 +0800

    remove all hardcoded builder
---
 .../storage/plugin/banyandb/BanyanDBConverter.java | 139 +++++++++++
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  28 ++-
 .../plugin/banyandb/BanyanDBManagementDAO.java     |  20 ++
 .../plugin/banyandb/BanyanDBMetricsDAO.java        |  36 +++
 .../{stream => }/BanyanDBNoneStreamDAO.java        |  32 +--
 .../banyandb/BanyanDBStorageBuilderFactory.java    | 112 ---------
 .../plugin/banyandb/BanyanDBStorageClient.java     |  24 +-
 .../plugin/banyandb/BanyanDBStorageProvider.java   |  31 +--
 .../storage/plugin/banyandb/MetadataRegistry.java  | 212 ++++++++++++++++
 .../storage/plugin/banyandb/StreamMetaInfo.java    | 140 -----------
 .../storage/plugin/banyandb/StreamMetadata.java    |  71 ++++++
 .../BanyanDBEventQueryDAO.java}                    |  27 +-
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java |  66 +++++
 .../BanyanDBNetworkAddressAliasDAO.java}           |  22 +-
 .../BanyanDBProfileTaskQueryDAO.java}              |  28 ++-
 .../banyandb/measure/BanyanDBTopologyQueryDAO.java |  63 +++++
 .../measure/BanyanDBUITemplateManagementDAO.java   |  55 ++++
 .../plugin/banyandb/schema/AlarmRecordBuilder.java |  55 ----
 .../schema/BanyanDBStorageDataBuilder.java         |  88 -------
 .../schema/BrowserErrorLogRecordBuilder.java       |  46 ----
 .../EndpointRelationServerSideMetricsBuilder.java  |  48 ----
 .../plugin/banyandb/schema/EventBuilder.java       |  52 ----
 .../plugin/banyandb/schema/LogRecordBuilder.java   |  58 -----
 .../storage/plugin/banyandb/schema/Metadata.java   | 100 --------
 .../schema/NetworkAddressAliasBuilder.java         |  49 ----
 .../schema/ProfileTaskLogRecordBuilder.java        |  45 ----
 .../banyandb/schema/ProfileTaskRecordBuilder.java  |  53 ----
 .../schema/ProfileThreadSnapshotRecordBuilder.java |  45 ----
 .../banyandb/schema/SegmentRecordBuilder.java      |  68 -----
 ...ceInstanceRelationClientSideMetricsBuilder.java |  47 ----
 ...ceInstanceRelationServerSideMetricsBuilder.java |  47 ----
 .../ServiceRelationClientSideMetricsBuilder.java   |  44 ----
 .../ServiceRelationServerSideMetricsBuilder.java   |  44 ----
 .../plugin/banyandb/schema/UITemplateBuilder.java  |  47 ----
 .../banyandb/stream/AbstractBanyanDBDAO.java       |  25 +-
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |  68 ++---
 .../stream/BanyanDBBrowserLogQueryDAO.java         |  77 +++---
 .../banyandb/stream/BanyanDBEventQueryDAO.java     | 148 -----------
 .../banyandb/stream/BanyanDBLogQueryDAO.java       |  83 ++++---
 .../banyandb/stream/BanyanDBManagementDAO.java     |  70 ------
 .../banyandb/stream/BanyanDBMetadataQueryDAO.java  | 214 ----------------
 .../plugin/banyandb/stream/BanyanDBMetricsDAO.java |  57 -----
 .../stream/BanyanDBNetworkAddressAliasDAO.java     |  76 ------
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  49 ++--
 .../stream/BanyanDBProfileTaskQueryDAO.java        | 120 ---------
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     | 146 ++++++-----
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |  27 +-
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |  69 +-----
 .../stream/BanyanDBStreamInsertRequest.java        |   2 +-
 .../banyandb/stream/BanyanDBTopologyQueryDAO.java  | 276 ---------------------
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     | 105 ++++----
 .../stream/BanyanDBUITemplateManagementDAO.java    | 119 ---------
 52 files changed, 1073 insertions(+), 2700 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
new file mode 100644
index 0000000000..565245ad55
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.SerializableTag;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.List;
+import java.util.function.Function;
+
+public class BanyanDBConverter {
+    @RequiredArgsConstructor
+    public static class StreamToEntity implements Convert2Entity {
+        private final StreamMetadata metadata;
+        private final RowEntity rowEntity;
+
+        @Override
+        public Object get(String fieldName) {
+            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+            if (metadata == null) {
+                return null;
+            }
+            return rowEntity.getValue(metadata.getTagFamilyName(), metadata.getTagSpec().getTagName());
+        }
+
+        @Override
+        public <T, R> R getWith(String fieldName, Function<T, R> typeDecoder) {
+            return (R) this.get(fieldName);
+        }
+    }
+
+    @RequiredArgsConstructor
+    public static class StreamToStorage implements Convert2Storage<StreamWrite> {
+        private final StreamMetadata metadata;
+        private final StreamWrite streamWrite;
+
+        @Override
+        public void accept(String fieldName, Object fieldValue) {
+            // skip "time_bucket"
+            if (Record.TIME_BUCKET.equals(fieldName)) {
+                return;
+            }
+            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+            if (metadata == null) {
+                return;
+            }
+            switch (metadata.getTagFamilyName()) {
+                case StreamMetadata.TAG_FAMILY_DATA:
+                    this.streamWrite.dataTag(metadata.getTagIndex(), buildTag(fieldValue));
+                    break;
+                case StreamMetadata.TAG_FAMILY_SEARCHABLE:
+                    this.streamWrite.searchableTag(metadata.getTagIndex(), buildTag(fieldValue));
+                    break;
+                default:
+                    throw new IllegalStateException("tag family is not supported");
+            }
+        }
+
+        private SerializableTag<BanyandbModel.TagValue> buildTag(Object value) {
+            if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) {
+                return TagAndValue.longField((long) value);
+            } else if (String.class.equals(value.getClass())) {
+                return TagAndValue.stringField((String) value);
+            }
+            throw new IllegalStateException(value.getClass() + " is not supported");
+        }
+
+        @Override
+        public void accept(String fieldName, byte[] fieldValue) {
+            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+            if (metadata == null) {
+                return;
+            }
+            if (StreamMetadata.TAG_FAMILY_SEARCHABLE.equals(metadata.getTagFamilyName())) {
+                this.streamWrite.searchableTag(metadata.getTagIndex(), TagAndValue.binaryField((fieldValue)));
+            } else {
+                throw new IllegalStateException("binary tag should not be store in the `data` family");
+            }
+        }
+
+        @Override
+        public void accept(String fieldName, List<String> fieldValue) {
+            for (final String tagKeyAndValue : fieldValue) {
+                if (StringUtil.isEmpty(tagKeyAndValue)) {
+                    continue;
+                }
+                int pos = tagKeyAndValue.indexOf("=");
+                if (pos == -1) {
+                    continue;
+                }
+                String key = tagKeyAndValue.substring(0, pos);
+                String value = tagKeyAndValue.substring(pos + 1);
+                this.accept(key, value);
+            }
+        }
+
+        @Override
+        public Object get(String fieldName) {
+            final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+            if (metadata == null) {
+                return null;
+            }
+            // TODO: get an unmodifiable view of tag
+            return null;
+        }
+
+        @Override
+        public StreamWrite obtain() {
+            if (metadata.isUseIdAsEntity()) {
+                this.accept(StreamMetadata.ID, this.streamWrite.getElementID());
+            }
+            return this.streamWrite;
+        }
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index c9566b1549..f4483b0a14 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -19,6 +19,11 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
@@ -27,21 +32,36 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 @Slf4j
 public class BanyanDBIndexInstaller extends ModelInstaller {
+    private final ConfigService configService;
+
     public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager) {
         super(client, moduleManager);
+        this.configService = moduleManager.find(CoreModule.NAME)
+                .provider()
+                .getService(ConfigService.class);
     }
 
     @Override
     protected boolean isExists(Model model) throws StorageException {
+        // TODO: get from BanyanDB and make a diff?
         return false;
     }
 
     @Override
     protected void createTable(Model model) throws StorageException {
-        StreamMetaInfo metaInfo = StreamMetaInfo.addModel(model);
-        if (metaInfo != null) {
-            log.info("install index {}", model.getName());
-            ((BanyanDBStorageClient) client).createStream(metaInfo);
+        if (model.isTimeSeries() && model.isRecord()) { // stream
+            StreamMetadata metaInfo = MetadataRegistry.INSTANCE.registerModel(model, this.configService);
+            if (metaInfo != null) {
+                log.info("install index {}", model.getName());
+                ((BanyanDBStorageClient) client).define(
+                        new Group(metaInfo.getGroup(), Catalog.STREAM, 2, 10, Duration.ofDays(7))
+                );
+                ((BanyanDBStorageClient) client).define(metaInfo);
+            }
+        } else if (model.isTimeSeries() && !model.isRecord()) { // measure
+            log.info("skip measure index {}", model.getName());
+        } else if (!model.isTimeSeries()) { // UITemplate
+            log.info("skip property index {}", model.getName());
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java
new file mode 100644
index 0000000000..080bf4577a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+import java.io.IOException;
+
+@RequiredArgsConstructor
+public class BanyanDBManagementDAO implements IManagementDAO {
+    private final BanyanDBStorageClient client;
+    private final StorageBuilder<ManagementData> storageBuilder;
+
+    @Override
+    public void insert(Model model, ManagementData storageData) throws IOException {
+
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
new file mode 100644
index 0000000000..af54dd350f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
@@ -0,0 +1,36 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+@RequiredArgsConstructor
+public class BanyanDBMetricsDAO implements IMetricsDAO {
+    private final StorageBuilder<Metrics> storageBuilder;
+
+    @Override
+    public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+        return new InsertRequest() {
+        };
+    }
+
+    @Override
+    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+        return new UpdateRequest() {
+        };
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
similarity index 51%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index ae5c766897..bd4a5b0926 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -16,36 +16,26 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 
 import java.io.IOException;
 
-/**
- * DAO for NoneStream, specifically ProfileTaskRecord
- *
- * @param <T> For NoneStream, we only have {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord}
- */
-@RequiredArgsConstructor
-public class BanyanDBNoneStreamDAO<T extends NoneStream> implements INoneStreamDAO {
-    private final BanyanDBStorageClient client;
-    private final BanyanDBStorageDataBuilder<T> storageBuilder;
+public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> implements INoneStreamDAO {
+    private final StorageBuilder<NoneStream> storageBuilder;
+
+    public BanyanDBNoneStreamDAO(BanyanDBStorageClient client, StorageBuilder<NoneStream> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
 
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
-        final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling());
-        StreamWrite.StreamWriteBuilder builder =
-                this.storageBuilder.entity2Storage((T) noneStream)
-                        .name(model.getName())
-                        .timestamp(timestamp);
-        this.client.write(builder.build());
+
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
deleted file mode 100644
index 0cb10763eb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EndpointRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceInstanceRelationClientSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceInstanceRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceRelationClientSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
-
-@Slf4j
-public class BanyanDBStorageBuilderFactory implements StorageBuilderFactory {
-    private static final StorageBuilderFactory FALLBACK = new StorageBuilderFactory.Default();
-
-    @Override
-    public BuilderTemplateDefinition builderTemplate() {
-        return new BuilderTemplateDefinition(StorageHashMapBuilder.class.getName(), "metrics-builder");
-    }
-
-    @Override
-    public Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType, Class<? extends StorageBuilder> defaultBuilder) {
-        if (SegmentRecord.class.equals(dataType)) {
-            return SegmentRecordBuilder.class;
-        } else if (AlarmRecord.class.equals(dataType)) {
-            return AlarmRecordBuilder.class;
-        } else if (BrowserErrorLogRecord.class.equals(dataType)) {
-            return BrowserErrorLogRecordBuilder.class;
-        } else if (LogRecord.class.equals(dataType)) {
-            return LogRecordBuilder.class;
-        } else if (ProfileTaskLogRecord.class.equals(dataType)) {
-            return ProfileTaskLogRecordBuilder.class;
-        } else if (ProfileThreadSnapshotRecord.class.equals(dataType)) {
-            return ProfileThreadSnapshotRecordBuilder.class;
-        } else if (ProfileTaskRecord.class.equals(dataType)) {
-            return ProfileTaskRecordBuilder.class;
-        } else if (UITemplate.class.equals(dataType)) {
-            return UITemplateBuilder.class;
-        } else if (Event.class.equals(dataType)) {
-            return EventBuilder.class;
-        } else if (ServiceTraffic.class.equals(dataType)) {
-            return Metadata.ServiceTrafficBuilder.class;
-        } else if (InstanceTraffic.class.equals(dataType)) {
-            return Metadata.InstanceTrafficBuilder.class;
-        } else if (EndpointTraffic.class.equals(dataType)) {
-            return Metadata.EndpointTrafficBuilder.class;
-        } else if (NetworkAddressAlias.class.equals(dataType)) {
-            return NetworkAddressAliasBuilder.class;
-        } else if (EndpointRelationServerSideMetrics.class.equals(dataType)) {
-            return EndpointRelationServerSideMetricsBuilder.class;
-        } else if (ServiceRelationServerSideMetrics.class.equals(dataType)) {
-            return ServiceRelationServerSideMetricsBuilder.class;
-        } else if (ServiceRelationClientSideMetrics.class.equals(dataType)) {
-            return ServiceRelationClientSideMetricsBuilder.class;
-        } else if (ServiceInstanceRelationServerSideMetrics.class.equals(dataType)) {
-            return ServiceInstanceRelationServerSideMetricsBuilder.class;
-        } else if (ServiceInstanceRelationClientSideMetrics.class.equals(dataType)) {
-            return ServiceInstanceRelationClientSideMetricsBuilder.class;
-        }
-
-        return FALLBACK.builderOf(dataType, defaultBuilder);
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 6b4780cad7..b1ca206110 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
-import org.apache.skywalking.banyandb.v1.client.GroupedBanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -33,6 +32,8 @@ import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckab
 import org.apache.skywalking.oap.server.library.util.HealthChecker;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
@@ -40,15 +41,16 @@ import java.io.IOException;
  */
 public class BanyanDBStorageClient implements Client, HealthCheckable {
     private final BanyanDBClient client;
-    private GroupedBanyanDBClient streamClient;
+    private final Map<String, Group> groupMap;
     private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
 
     public BanyanDBStorageClient(String host, int port) {
         this.client = new BanyanDBClient(host, port);
+        this.groupMap = new ConcurrentHashMap<>();
     }
 
-    public void defineStreamGroup(Group group) {
-        this.streamClient = this.client.attachGroup(group);
+    public Group define(Group group) {
+        return groupMap.computeIfAbsent(group.getName(), s -> client.define(group));
     }
 
     @Override
@@ -63,7 +65,7 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
 
     public StreamQueryResponse query(StreamQuery streamQuery) {
         try {
-            StreamQueryResponse response = this.streamClient.queryStreams(streamQuery);
+            StreamQueryResponse response = this.client.queryStreams(streamQuery);
             this.healthChecker.health();
             return response;
         } catch (Throwable t) {
@@ -72,19 +74,19 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
-    public void createStream(StreamMetaInfo streamMetaInfo) {
-        Stream stm = this.streamClient.define(streamMetaInfo.getStream());
-        if (stm != null) {
-            this.streamClient.defineIndexRules(stm, streamMetaInfo.getIndexRules().toArray(new IndexRule[]{}));
+    public void define(StreamMetadata streamMetadata) {
+        Stream stream = this.client.define(streamMetadata.getStream());
+        if (stream != null) {
+            this.client.defineIndexRules(stream, streamMetadata.getIndexRules().toArray(new IndexRule[]{}));
         }
     }
 
     public void write(StreamWrite streamWrite) {
-        this.streamClient.write(streamWrite);
+        this.client.write(streamWrite);
     }
 
     public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
-        return this.streamClient.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
+        return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index b938ab1159..922bf2296a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
@@ -48,21 +46,21 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBUITemplateManagementDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEventQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBHistoryDeleteDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBNetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBTopologyQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBTraceQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBUITemplateManagementDAO;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -93,26 +91,26 @@ public class BanyanDBStorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-        this.registerServiceImplementation(StorageBuilderFactory.class, new BanyanDBStorageBuilderFactory());
+        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
 
         this.client = new BanyanDBStorageClient(config.getHost(), config.getPort());
 
         // Stream
         this.registerServiceImplementation(IBatchDAO.class, new BanyanDBBatchDAO(client, config.getMaxBulkSize(), config.getFlushInterval(), config.getConcurrentWriteThreads()));
         this.registerServiceImplementation(StorageDAO.class, new BanyanDBStorageDAO(client));
-        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO(client));
+        this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO());
         this.registerServiceImplementation(ITraceQueryDAO.class, new BanyanDBTraceQueryDAO(client));
         this.registerServiceImplementation(IBrowserLogQueryDAO.class, new BanyanDBBrowserLogQueryDAO(client));
-        this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client));
+        this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO());
         this.registerServiceImplementation(IAlarmQueryDAO.class, new BanyanDBAlarmQueryDAO(client));
         this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client));
-        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO(client));
+        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO());
         this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
         this.registerServiceImplementation(
                 IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
-        this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client));
-        this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO(client));
-        this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO(client));
+        this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO());
+        this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO());
+        this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO());
 
         // TODO: metrics
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO());
@@ -135,9 +133,6 @@ public class BanyanDBStorageProvider extends ModuleProvider {
         this.client.registerChecker(healthChecker);
         try {
             this.client.connect();
-            // create stream group
-            final Group streamGroup = new Group("default-stream", Catalog.STREAM, 2);
-            this.client.defineStreamGroup(streamGroup);
             BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, getManager());
             getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
         } catch (Exception e) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
new file mode 100644
index 0000000000..83af6da867
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public enum MetadataRegistry {
+    INSTANCE;
+
+    private final Map<String, StreamMetadata> streams = new HashMap<>();
+
+    public StreamMetadata registerModel(Model model, ConfigService configService) {
+        BanyandbDatabase.Stream pbStream = parseStreamFromModel(model, configService);
+
+        final boolean useIdAsEntity = pbStream.getEntity().getTagNamesCount() == 1 &&
+                StreamMetadata.ID.equals(pbStream.getEntity().getTagNames(0));
+
+        final Stream stream = new Stream(pbStream.getMetadata().getGroup(), pbStream.getMetadata().getName());
+
+        List<IndexRule> indexRules = new ArrayList<>();
+        Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
+        stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
+
+        Map<String, StreamMetadata.TagMetadata> tagDefinition = new HashMap<>();
+
+        for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
+            final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
+            stream.addTagFamilySpec(tagFamilySpec);
+
+            int tagIndex = 0;
+            for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
+                // register tag
+                tagDefinition.put(tagSpec.getTagName(), new StreamMetadata.TagMetadata(tagFamilySpec.getTagFamilyName(), tagSpec, tagIndex++));
+
+                // if the tag family equals to "searchable", build index rules
+                if (tagFamilySpec.getTagFamilyName().equals(StreamMetadata.TAG_FAMILY_SEARCHABLE)) {
+                    // check if this spec exists in the entity names
+                    if (entityNameSet.contains(tagSpec.getTagName())) {
+                        continue;
+                    }
+                    BanyandbDatabase.IndexRule pbIndexRule = parseIndexRuleFromTagSpec(pbStream.getMetadata(), tagSpec);
+                    IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
+                    indexRules.add(indexRule);
+                }
+            }
+        }
+
+        StreamMetadata streamMetadata = StreamMetadata.builder().model(model).stream(stream)
+                .tagDefinition(tagDefinition)
+                .indexRules(indexRules)
+                .group(pbStream.getMetadata().getGroup())
+                .useIdAsEntity(useIdAsEntity)
+                .build();
+        streams.put(model.getName(), streamMetadata);
+        return streamMetadata;
+    }
+
+    public StreamMetadata findStreamMetadata(final String name) {
+        return this.streams.get(name);
+    }
+
+    private BanyandbDatabase.Stream parseStreamFromModel(Model model, ConfigService configService) {
+        List<ModelColumn> shardingColumns = new ArrayList<>();
+
+        List<BanyandbDatabase.TagSpec> searchableTagsSpecs = new ArrayList<>();
+        List<BanyandbDatabase.TagSpec> dataTagsSpecs = new ArrayList<>();
+        for (final ModelColumn modelColumn : model.getColumns()) {
+            if (modelColumn.getShardingKeyIdx() > -1) {
+                shardingColumns.add(modelColumn);
+            }
+            if (modelColumn.isIndexOnly()) {
+                // skip
+            } else if (modelColumn.isStorageOnly()) {
+                dataTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+            } else {
+                searchableTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+            }
+        }
+
+        Set<String> entities = shardingColumns.stream()
+                .sorted(Comparator.comparingInt(ModelColumn::getShardingKeyIdx))
+                .map(modelColumn -> modelColumn.getColumnName().getStorageName())
+                .collect(Collectors.toSet());
+
+        if (entities.isEmpty()) {
+            // if sharding keys are not defined, we have to use ID
+            entities = Collections.singleton(StreamMetadata.ID);
+            // append ID
+            searchableTagsSpecs.add(BanyandbDatabase.TagSpec.newBuilder()
+                    .setName(StreamMetadata.ID)
+                    .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build());
+        }
+
+        // add all user-defined indexed tags to the end of the "searchable" family
+        if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableTracesTags()));
+        } else if (LogRecord.INDEX_NAME.equals(model.getName())) {
+            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableLogsTags()));
+        } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
+            searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableAlarmTags()));
+        }
+
+        String group = "default-stream";
+        if (model.isSuperDataset()) {
+            // for superDataset, we should use separate group
+            group = model.getName() + "-stream";
+        }
+
+        return BanyandbDatabase.Stream.newBuilder()
+                .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
+                        .setName(StreamMetadata.TAG_FAMILY_DATA)
+                        .addAllTags(dataTagsSpecs)
+                        .build())
+                .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
+                        .setName(StreamMetadata.TAG_FAMILY_SEARCHABLE)
+                        .addAllTags(searchableTagsSpecs)
+                        .build())
+                .setEntity(BanyandbDatabase.Entity.newBuilder()
+                        .addAllTagNames(entities)
+                        .build())
+                .setMetadata(BanyandbCommon.Metadata.newBuilder()
+                        .setGroup(group)
+                        .setName(model.getName())
+                        .build())
+                .build();
+    }
+
+    private BanyandbDatabase.TagSpec parseTagSpecFromModelColumn(ModelColumn modelColumn) {
+        final Class<?> clazz = modelColumn.getType();
+        if (String.class.equals(clazz)) {
+            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+                    .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build();
+        } else if (int.class.equals(clazz) || long.class.equals(clazz)) {
+            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+                    .setType(BanyandbDatabase.TagType.TAG_TYPE_INT).build();
+        } else if (byte[].class.equals(clazz) || DataTable.class.equals(clazz)) {
+            return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+                    .setType(BanyandbDatabase.TagType.TAG_TYPE_DATA_BINARY).build();
+        } else {
+            throw new IllegalStateException("type " + modelColumn.getType().toString() + " is not supported");
+        }
+    }
+
+    private List<BanyandbDatabase.TagSpec> parseTagSpecsFromConfiguration(String tags) {
+        if (StringUtil.isEmpty(tags)) {
+            return Collections.emptyList();
+        }
+        String[] tagsArray = tags.split(",");
+        if (tagsArray.length == 0) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(tagsArray)
+                .map(tagName -> BanyandbDatabase.TagSpec.newBuilder().setName(tagName)
+                        .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build())
+                .collect(Collectors.toList());
+    }
+
+    private BanyandbDatabase.IndexRule parseIndexRuleFromTagSpec(BanyandbCommon.Metadata metadata, TagFamilySpec.TagSpec tagSpec) {
+        // In SkyWalking, only "trace_id" should be stored as a global index
+        BanyandbDatabase.IndexRule.Location loc = "trace_id".equals(tagSpec.getTagName()) ?
+                BanyandbDatabase.IndexRule.Location.LOCATION_GLOBAL :
+                BanyandbDatabase.IndexRule.Location.LOCATION_SERIES;
+
+        return BanyandbDatabase.IndexRule.newBuilder()
+                .setMetadata(BanyandbCommon.Metadata.newBuilder()
+                        .setName(tagSpec.getTagName()).setGroup(metadata.getGroup()))
+                .setLocation(loc)
+                .addTags(tagSpec.getTagName())
+                // TODO: support TYPE_TREE
+                .setType(BanyandbDatabase.IndexRule.Type.TYPE_INVERTED)
+                .build();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
deleted file mode 100644
index 0a2af94339..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.CharStreams;
-import com.google.protobuf.util.JsonFormat;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@Getter
-@Builder
-@Slf4j
-public class StreamMetaInfo {
-    public static final String TAG_FAMILY_SEARCHABLE = "searchable";
-    public static final String TAG_FAMILY_DATA = "data";
-
-    public static final String ID = "id";
-
-    private static final Map<String, StreamMetaInfo> STREAMS = new HashMap<>();
-
-    private final Model model;
-
-    /**
-     * stream is the metadata to be used for schema creation,
-     * 1. Read json from resources/metadata/{model.name}.json and deserialize to protobuf,
-     * 2. Iterate over tag families,
-     * 3. Iterate over tags in each tag family
-     * 4.
-     */
-    private final Stream stream;
-
-    private final List<IndexRule> indexRules;
-
-    public static StreamMetaInfo addModel(Model model) {
-        BanyandbDatabase.Stream pbStream = parseStreamFromJSON(model.getName());
-        if (pbStream == null) {
-            log.warn("fail to find stream schema {}", model.getName());
-            return null;
-        }
-        final Stream stream = new Stream(pbStream.getMetadata().getName());
-
-        List<IndexRule> indexRules = new ArrayList<>();
-
-        Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
-
-        stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
-
-        for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
-            final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
-            stream.addTagFamilySpec(tagFamilySpec);
-
-            // if the tag family equals to "searchable", build index rules
-            if (tagFamilySpec.getTagFamilyName().equals(TAG_FAMILY_SEARCHABLE)) {
-                for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
-                    // check if this spec exists in the entity names
-                    if (entityNameSet.contains(tagSpec.getTagName())) {
-                        continue;
-                    }
-                    BanyandbDatabase.IndexRule pbIndexRule = parseIndexRulesFromJSON(model.getName(), tagSpec.getTagName());
-                    if (pbIndexRule == null) {
-                        log.warn("fail to find the index rule for {}", tagSpec.getTagName());
-                        continue;
-                    }
-                    IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
-                    indexRules.add(indexRule);
-                }
-            }
-        }
-
-        return StreamMetaInfo.builder().model(model).stream(stream).indexRules(indexRules).build();
-    }
-
-    private static BanyandbDatabase.Stream parseStreamFromJSON(String name) {
-        try {
-            InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream("metadata/" + name + ".json");
-            if (is == null) {
-                log.warn("fail to find definition for {}", name);
-                return null;
-            }
-            String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
-            BanyandbDatabase.Stream.Builder b = BanyandbDatabase.Stream.newBuilder();
-            JsonFormat.parser().merge(result, b);
-            return b.build();
-        } catch (IOException ioEx) {
-            log.error("fail to read json", ioEx);
-            return null;
-        }
-    }
-
-    private static BanyandbDatabase.IndexRule parseIndexRulesFromJSON(String streamName, String name) {
-        try {
-            InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream(String.join("/",
-                    new String[]{"metadata", "index_rules", name + ".json"}));
-            if (is == null) {
-                log.warn("fail to find index rules for {}", streamName);
-                return null;
-            }
-            String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
-            BanyandbDatabase.IndexRule.Builder b = BanyandbDatabase.IndexRule.newBuilder();
-            JsonFormat.parser().merge(result, b);
-            return b.build();
-        } catch (IOException ioEx) {
-            log.error("fail to read json", ioEx);
-            return null;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
new file mode 100644
index 0000000000..814f5f62e2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Builder
+@Slf4j
+public class StreamMetadata {
+    public static final String TAG_FAMILY_SEARCHABLE = "searchable";
+    public static final String TAG_FAMILY_DATA = "data";
+
+    public static final String ID = "id";
+
+    private final Model model;
+
+    private final Map<String, TagMetadata> tagDefinition;
+
+    /**
+     * Group of the stream
+     */
+    private final String group;
+    /**
+     * Spec of the stream
+     */
+    private final Stream stream;
+    /**
+     * Index rules attached to the stream
+     */
+    private final List<IndexRule> indexRules;
+
+    private final int dataFamilySize;
+    private final int searchableFamilySize;
+
+    private final boolean useIdAsEntity;
+
+    @Getter
+    @Data
+    public static class TagMetadata {
+        private final String tagFamilyName;
+        private final TagFamilySpec.TagSpec tagSpec;
+        private final int tagIndex;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
similarity index 59%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
index f8287973bc..f06de0cff5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
@@ -16,15 +16,22 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
 
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
-    @Getter
-    private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.util.List;
+
+public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+    @Override
+    public Events queryEvents(EventQueryCondition condition) throws Exception {
+        return new Events();
+    }
+
+    @Override
+    public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
+        return new Events();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
new file mode 100644
index 0000000000..5833c98c56
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Process;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+    @Override
+    public List<Service> listServices(String layer, String group) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Service> getServices(String serviceId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ServiceInstance getInstance(String instanceId) throws IOException {
+        return null;
+    }
+
+    @Override
+    public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Process> listProcesses(String serviceId, String instanceId) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Process getProcess(String processId) throws IOException {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
similarity index 65%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
index f8287973bc..685d7b8083 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
@@ -16,15 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
-    @Getter
-    private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+    @Override
+    public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+        return Collections.emptyList();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
similarity index 57%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
index f8287973bc..0c6e1ebfd1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
@@ -16,15 +16,23 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
-    @Getter
-    private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+    @Override
+    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ProfileTask getById(String id) throws IOException {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
new file mode 100644
index 0000000000..9ce2282f56
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.type.Call;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBTopologyQueryDAO implements ITopologyQueryDAO {
+    @Override
+    public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
+        return Collections.emptyList();
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java
new file mode 100644
index 0000000000..efbd9ce106
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+    @Override
+    public DashboardConfiguration getTemplate(String id) throws IOException {
+        return null;
+    }
+
+    @Override
+    public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
+        return null;
+    }
+
+    @Override
+    public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
+        return null;
+    }
+
+    @Override
+    public TemplateChangeStatus disableTemplate(String id) throws IOException {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
deleted file mode 100644
index 260fc17664..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class AlarmRecordBuilder extends BanyanDBStorageDataBuilder<AlarmRecord> {
-    public static final List<String> INDEXED_TAGS = ImmutableList.of(
-            "level"
-    );
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(AlarmRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
-        searchable.add(TagAndValue.longField(entity.getScope()));
-        searchable.add(TagAndValue.longField(entity.getStartTime()));
-        searchable.addAll(filterSearchableTags(entity.getTags(), INDEXED_TAGS));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(AlarmRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(6);
-        data.add(TagAndValue.stringField(entity.getName()));
-        data.add(TagAndValue.stringField(entity.getId0()));
-        data.add(TagAndValue.stringField(entity.getId1()));
-        data.add(TagAndValue.stringField(entity.getAlarmMessage()));
-        data.add(TagAndValue.stringField(entity.getRuleName()));
-        data.add(TagAndValue.binaryField(entity.getTagsRawData()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
deleted file mode 100644
index 814f3149f7..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class BanyanDBStorageDataBuilder<T extends StorageData> implements StorageBuilder<T, StreamWrite.StreamWriteBuilder> {
-    @Override
-    public T storage2Entity(StreamWrite.StreamWriteBuilder storageData) {
-        return null;
-    }
-
-    @Override
-    public StreamWrite.StreamWriteBuilder entity2Storage(T entity) {
-        StreamWrite.StreamWriteBuilder b = StreamWrite.builder()
-                .elementId(this.elementID(entity))
-                .searchableTags(this.searchableTags(entity))
-                .dataTags(this.dataTags(entity));
-        Long ts = this.extractTimestamp(entity);
-        if (ts != null) {
-            b.timestamp(ts);
-        }
-        return b;
-    }
-
-    protected Long extractTimestamp(T entity) {
-        return null;
-    }
-
-    protected List<SerializableTag<BanyandbModel.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
-        if (rawTags == null) {
-            return Collections.emptyList();
-        }
-        Map<String, SerializableTag<BanyandbModel.TagValue>> map = new HashMap<>();
-        for (final Tag tag : rawTags) {
-            map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
-        }
-        final List<SerializableTag<BanyandbModel.TagValue>> tags = new ArrayList<>();
-        for (String indexedTag : indexTags) {
-            SerializableTag<BanyandbModel.TagValue> tag = map.get(indexedTag);
-            if (tag == null) {
-                tags.add(TagAndValue.nullField());
-            } else {
-                tags.add(tag);
-            }
-        }
-
-        return tags;
-    }
-
-    protected String elementID(T entity) {
-        return entity.id();
-    }
-
-    abstract protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(T entity);
-
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(T entity) {
-        return Collections.emptyList();
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
deleted file mode 100644
index 034565249b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class BrowserErrorLogRecordBuilder extends BanyanDBStorageDataBuilder<BrowserErrorLogRecord> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(BrowserErrorLogRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
-        searchable.add(TagAndValue.stringField(entity.getUniqueId()));
-        searchable.add(TagAndValue.stringField(entity.getServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getServiceVersionId()));
-        searchable.add(TagAndValue.stringField(entity.getPagePathId()));
-        searchable.add(TagAndValue.longField(entity.getErrorCategory()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(BrowserErrorLogRecord entity) {
-        return Collections.singletonList(TagAndValue.binaryField(entity.getDataBinary()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java
deleted file mode 100644
index c6cef854e4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class EndpointRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<EndpointRelationServerSideMetrics> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(EndpointRelationServerSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
-        // 0 - source_endpoint
-        searchable.add(TagAndValue.stringField(entity.getSourceEndpoint()));
-        // 1 - dest_endpoint
-        searchable.add(TagAndValue.stringField(entity.getDestEndpoint()));
-        // 2 - entity_id
-        searchable.add(TagAndValue.stringField(entity.getEntityId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(EndpointRelationServerSideMetrics entity) {
-        // 0 - component_id
-        return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
deleted file mode 100644
index 512fe2aa5c..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.source.Event;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class EventBuilder extends BanyanDBStorageDataBuilder<Event> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(Event entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(8);
-        searchable.add(TagAndValue.stringField(entity.getUuid()));
-        searchable.add(TagAndValue.stringField(entity.getService()));
-        searchable.add(TagAndValue.stringField(entity.getServiceInstance()));
-        searchable.add(TagAndValue.stringField(entity.getEndpoint()));
-        searchable.add(TagAndValue.stringField(entity.getName()));
-        searchable.add(TagAndValue.stringField(entity.getType()));
-        searchable.add(TagAndValue.longField(entity.getStartTime()));
-        searchable.add(TagAndValue.longField(entity.getEndTime()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(Event entity) {
-        return ImmutableList.of(
-                TagAndValue.stringField(entity.getMessage()),
-                TagAndValue.stringField(entity.getParameters())
-        );
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
deleted file mode 100644
index 8fddd927a4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class LogRecordBuilder extends BanyanDBStorageDataBuilder<LogRecord> {
-    public static final List<String> INDEXED_TAGS = ImmutableList.of(
-            "level"
-    );
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(LogRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
-        searchable.add(TagAndValue.stringField(entity.getUniqueId()));
-        searchable.add(TagAndValue.stringField(entity.getServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getServiceInstanceId()));
-        searchable.add(TagAndValue.stringField(entity.getServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getEndpointId()));
-        searchable.add(TagAndValue.stringField(entity.getTraceId()));
-        searchable.add(TagAndValue.stringField(entity.getTraceSegmentId()));
-        searchable.add(TagAndValue.longField(entity.getSpanId()));
-        searchable.addAll(filterSearchableTags(entity.getTags(), INDEXED_TAGS));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(LogRecord entity) {
-        return ImmutableList.of(
-                TagAndValue.stringField(entity.getContent()),
-                TagAndValue.longField(entity.getContentType()),
-                TagAndValue.binaryField(entity.getTagsRawData())
-        );
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
deleted file mode 100644
index 9f0fda0ac4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.skywalking.oap.server.core.Const.DOUBLE_COLONS_SPLIT;
-
-public class Metadata {
-    public static class ServiceTrafficBuilder extends BanyanDBStorageDataBuilder<ServiceTraffic> {
-        @Override
-        protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceTraffic entity) {
-            final String serviceName = entity.getName();
-            entity.setShortName(serviceName);
-            if (entity.isNormal()) {
-                int groupIdx = serviceName.indexOf(DOUBLE_COLONS_SPLIT);
-                if (groupIdx > 0) {
-                    entity.setGroup(serviceName.substring(0, groupIdx));
-                    entity.setShortName(serviceName.substring(groupIdx + 2));
-                }
-            }
-            List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(4);
-            // 0 - serviceName
-            searchable.add(TagAndValue.stringField(serviceName));
-            // 1 - serviceID
-            searchable.add(TagAndValue.stringField(entity.getServiceId()));
-            // 2 - group
-            searchable.add(TagAndValue.stringField(entity.getGroup()));
-            // 3 - layer
-            Layer layer = entity.getLayer();
-            searchable.add(TagAndValue.longField(layer != null ? layer.value() : Layer.UNDEFINED.value()));
-            return searchable;
-        }
-
-        @Override
-        protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceTraffic entity) {
-            // 0 - shortName
-            return Collections.singletonList(TagAndValue.stringField(entity.getShortName()));
-        }
-    }
-
-    public static class EndpointTrafficBuilder extends BanyanDBStorageDataBuilder<EndpointTraffic> {
-        @Override
-        protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(EndpointTraffic entity) {
-            List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
-            // 0 - serviceID
-            searchable.add(TagAndValue.stringField(entity.getServiceId()));
-            // 1 - name
-            searchable.add(TagAndValue.stringField(entity.getName()));
-            return searchable;
-        }
-    }
-
-    public static class InstanceTrafficBuilder extends BanyanDBStorageDataBuilder<InstanceTraffic> {
-        @Override
-        protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(InstanceTraffic entity) {
-            List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
-            // serviceID
-            searchable.add(TagAndValue.stringField(entity.getServiceId()));
-            // lastPingTimestamp
-            searchable.add(TagAndValue.longField(entity.getLastPingTimestamp()));
-            // ID: we have to duplicate "ID" here for query
-            searchable.add(TagAndValue.stringField(entity.id()));
-            return searchable;
-        }
-
-        @Override
-        protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(InstanceTraffic entity) {
-            return Collections.singletonList(TagAndValue.binaryField(
-                    entity.serialize().build().toByteArray()
-            ));
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
deleted file mode 100644
index e429f80b9c..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class NetworkAddressAliasBuilder extends BanyanDBStorageDataBuilder<NetworkAddressAlias> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(NetworkAddressAlias entity) {
-        return Collections.singletonList(TagAndValue.longField(entity.getLastUpdateTimeBucket()));
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(NetworkAddressAlias entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(4);
-        // 0 - time_bucket
-        data.add(TagAndValue.longField(entity.getTimeBucket()));
-        // 1 - address
-        data.add(TagAndValue.stringField(entity.getAddress()));
-        // 2 - represent_service_id
-        data.add(TagAndValue.stringField(entity.getRepresentServiceId()));
-        // 3 - represent_service_instance_id
-        data.add(TagAndValue.stringField(entity.getRepresentServiceInstanceId()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
deleted file mode 100644
index 1f861ab4cd..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ProfileTaskLogRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskLogRecord> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileTaskLogRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
-        searchable.add(TagAndValue.longField(entity.getOperationTime()));
-        searchable.add(TagAndValue.stringField(entity.getInstanceId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ProfileTaskLogRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(2);
-        data.add(TagAndValue.stringField(entity.getTaskId()));
-        data.add(TagAndValue.longField(entity.getOperationType()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
deleted file mode 100644
index 23a5bf8fdf..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ProfileTaskRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskRecord> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileTaskRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(9);
-        // 0 - id
-        searchable.add(TagAndValue.stringField(entity.id()));
-        // 1 - service_id
-        searchable.add(TagAndValue.stringField(entity.getServiceId()));
-        // 2 - endpoint_name
-        searchable.add(TagAndValue.stringField(entity.getEndpointName()));
-        // 3 - start_time
-        searchable.add(TagAndValue.longField(entity.getStartTime()));
-        // 4 - duration
-        searchable.add(TagAndValue.longField(entity.getDuration()));
-        // 5 - min_duration_threshold
-        searchable.add(TagAndValue.longField(entity.getMinDurationThreshold()));
-        // 6 - dump_period
-        searchable.add(TagAndValue.longField(entity.getDumpPeriod()));
-        // 7 - create_time
-        searchable.add(TagAndValue.longField(entity.getCreateTime()));
-        // 8 - max_sampling_count
-        searchable.add(TagAndValue.longField(entity.getMaxSamplingCount()));
-        return searchable;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
deleted file mode 100644
index 28c0ba5f38..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordBuilder extends BanyanDBStorageDataBuilder<ProfileThreadSnapshotRecord> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileThreadSnapshotRecord entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(4);
-        searchable.add(TagAndValue.stringField(entity.getTaskId()));
-        searchable.add(TagAndValue.stringField(entity.getSegmentId()));
-        searchable.add(TagAndValue.longField(entity.getDumpTime()));
-        searchable.add(TagAndValue.longField(entity.getSequence()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ProfileThreadSnapshotRecord entity) {
-        return Collections.singletonList(TagAndValue.binaryField(entity.getStackBinary()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
deleted file mode 100644
index 729ad90e15..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class SegmentRecordBuilder extends BanyanDBStorageDataBuilder<SegmentRecord> {
-    public static final List<String> INDEXED_TAGS = ImmutableList.of(
-            "http.method",
-            "status_code",
-            "db.type",
-            "db.instance",
-            "mq.queue",
-            "mq.topic",
-            "mq.broker"
-    );
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(SegmentRecord segmentRecord) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(10);
-        // 0 - trace_id
-        searchable.add(TagAndValue.stringField(segmentRecord.getTraceId()));
-        // 1 - is_error
-        searchable.add(TagAndValue.longField(segmentRecord.getIsError()));
-        // 2 - service_id
-        searchable.add(TagAndValue.stringField(segmentRecord.getServiceId()));
-        // 3 - service_instance_id
-        searchable.add(TagAndValue.stringField(segmentRecord.getServiceInstanceId()));
-        // 4 - endpoint_id
-        searchable.add(TagAndValue.stringField(segmentRecord.getEndpointId()));
-        // 5 - latency
-        searchable.add(TagAndValue.longField(segmentRecord.getLatency()));
-        // 6 - start_time
-        searchable.add(TagAndValue.longField(segmentRecord.getStartTime()));
-        // 7 ~ 13: indexed tags
-        searchable.addAll(filterSearchableTags(segmentRecord.getTagsRawData(), INDEXED_TAGS));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(SegmentRecord entity) {
-        return Collections.singletonList(TagAndValue.binaryField(entity.getDataBinary()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java
deleted file mode 100644
index dc4de6ddcb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ServiceInstanceRelationClientSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceInstanceRelationClientSideMetrics> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceInstanceRelationClientSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
-        searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getEntityId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceInstanceRelationClientSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(3);
-        data.add(TagAndValue.longField(entity.getComponentId()));
-        data.add(TagAndValue.stringField(entity.getSourceServiceInstanceId()));
-        data.add(TagAndValue.stringField(entity.getDestServiceInstanceId()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java
deleted file mode 100644
index 7796265f8d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ServiceInstanceRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceInstanceRelationServerSideMetrics> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceInstanceRelationServerSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
-        searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getEntityId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceInstanceRelationServerSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>();
-        data.add(TagAndValue.longField(entity.getComponentId()));
-        data.add(TagAndValue.stringField(entity.getSourceServiceInstanceId()));
-        data.add(TagAndValue.stringField(entity.getDestServiceInstanceId()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java
deleted file mode 100644
index cf8ae73fdf..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ServiceRelationClientSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceRelationClientSideMetrics> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceRelationClientSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
-        searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getEntityId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceRelationClientSideMetrics entity) {
-        return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java
deleted file mode 100644
index 92a9a13fb9..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ServiceRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceRelationServerSideMetrics> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceRelationServerSideMetrics entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
-        searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
-        searchable.add(TagAndValue.stringField(entity.getEntityId()));
-        return searchable;
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceRelationServerSideMetrics entity) {
-        return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
deleted file mode 100644
index 6b53a63174..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class UITemplateBuilder extends BanyanDBStorageDataBuilder<UITemplate> {
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(UITemplate entity) {
-        return ImmutableList.of(
-                TagAndValue.stringField(entity.getName()),
-                TagAndValue.longField(entity.getDisabled())
-        );
-    }
-
-    @Override
-    protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(UITemplate entity) {
-        List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>();
-        data.add(TagAndValue.stringField(entity.getType()));
-        data.add(TagAndValue.stringField(entity.getConfiguration()));
-        data.add(TagAndValue.longField(entity.getActivated()));
-        return data;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 04690a30cb..0f093e396b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -19,17 +19,15 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.time.Instant;
 import java.util.List;
-import java.util.function.Function;
 
 public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
     private static final Instant UPPER_BOUND = Instant.ofEpochSecond(0, Long.MAX_VALUE);
@@ -40,17 +38,17 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         super(client);
     }
 
-    protected StreamQueryResponse query(String indexName, List<String> searchableTags, QueryBuilder builder) {
-        return this.query(indexName, searchableTags, null, builder);
+    protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, QueryBuilder builder) {
+        return this.query(metadata, searchableTags, null, builder);
     }
 
-    protected StreamQueryResponse query(String indexName, List<String> searchableTags, TimestampRange timestampRange,
+    protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, TimestampRange timestampRange,
                                         QueryBuilder builder) {
         final StreamQuery query;
         if (timestampRange == null) {
-            query = new StreamQuery(indexName, LARGEST_TIME_RANGE, searchableTags);
+            query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), LARGEST_TIME_RANGE, searchableTags);
         } else {
-            query = new StreamQuery(indexName, timestampRange, searchableTags);
+            query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), timestampRange, searchableTags);
         }
 
         builder.apply(query);
@@ -62,22 +60,19 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         abstract void apply(final StreamQuery query);
 
         protected PairQueryCondition<Long> eq(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.eq(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
         }
 
         protected PairQueryCondition<Long> lte(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.le(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.le(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
         }
 
         protected PairQueryCondition<Long> gte(String name, long value) {
-            return PairQueryCondition.LongQueryCondition.ge(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.LongQueryCondition.ge(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
         }
 
         protected PairQueryCondition<String> eq(String name, String value) {
-            return PairQueryCondition.StringQueryCondition.eq(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+            return PairQueryCondition.StringQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
         }
     }
-
-    interface RowEntityDeserializer<T> extends Function<RowEntity, T> {
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 08225e51e1..0b7693d16c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,14 +18,10 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
-import com.google.gson.reflect.TypeToken;
-import com.google.protobuf.ByteString;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -33,22 +29,25 @@ import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
 import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
 import org.apache.skywalking.oap.server.core.query.type.Alarms;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
  * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
  */
 public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
+    private final StreamMetadata alarmRecordMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(AlarmRecord.INDEX_NAME);
+
     public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -60,7 +59,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
             tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
         }
 
-        StreamQueryResponse resp = query(AlarmRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(alarmRecordMetadata,
                 ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
                 tsRange,
                 new QueryBuilder() {
@@ -76,9 +75,8 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
 
                         if (CollectionUtils.isNotEmpty(tags)) {
                             for (final Tag tag : tags) {
-                                if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                                    query.appendCondition(eq(tag.getKey(), tag.getValue()));
-                                }
+                                // TODO: check whether tags in the alarm are indexed
+                                query.appendCondition(eq(tag.getKey(), tag.getValue()));
                             }
                         }
                         query.setLimit(limit);
@@ -86,39 +84,27 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
                     }
                 });
 
-        List<AlarmMessage> messages = resp.getElements().stream().map(new AlarmMessageDeserializer())
-                .collect(Collectors.toList());
-
         Alarms alarms = new Alarms();
-        alarms.setTotal(messages.size());
-        alarms.getMsgs().addAll(messages);
-        return alarms;
-    }
+        alarms.setTotal(resp.size());
 
-    public static class AlarmMessageDeserializer implements RowEntityDeserializer<AlarmMessage> {
-        @Override
-        public AlarmMessage apply(RowEntity row) {
-            AlarmMessage alarmMessage = new AlarmMessage();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            int scopeID = ((Number) searchable.get(0).getValue()).intValue();
-            alarmMessage.setScopeId(scopeID);
-            alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
-            alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            alarmMessage.setId((String) data.get(0).getValue());
-            alarmMessage.setId1((String) data.get(1).getValue());
-            alarmMessage.setMessage((String) data.get(2).getValue());
-            Object o = data.get(3).getValue();
-            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-                this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
-            }
-            return alarmMessage;
-        }
+        for (final RowEntity rowEntity : resp.getElements()) {
+            AlarmRecord.Builder builder = new AlarmRecord.Builder();
+            AlarmRecord alarmRecord = builder.storage2Entity(
+                    new BanyanDBConverter.StreamToEntity(this.alarmRecordMetadata, rowEntity)
+            );
 
-        void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
-            List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
-            }.getType());
-            tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+            AlarmMessage message = new AlarmMessage();
+            message.setId(String.valueOf(alarmRecord.getId0()));
+            message.setId1(String.valueOf(alarmRecord.getId1()));
+            message.setMessage(alarmRecord.getAlarmMessage());
+            message.setStartTime(alarmRecord.getStartTime());
+            message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
+            message.setScopeId(alarmRecord.getScope());
+            if (!CollectionUtils.isEmpty(alarmRecord.getTagsRawData())) {
+                parserDataBinary(alarmRecord.getTagsRawData(), message.getTags());
+            }
+            alarms.getMsgs().add(message);
         }
+        return alarms;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index c1bbc16941..d384210cd9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -19,12 +19,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
@@ -35,17 +33,20 @@ import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
 import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
  */
 public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
+    private final StreamMetadata browserErrorLogRecordMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(BrowserErrorLogRecord.INDEX_NAME);
+
     public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -57,8 +58,7 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
             tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
         }
 
-        final BrowserErrorLogs logs = new BrowserErrorLogs();
-        StreamQueryResponse resp = query(BrowserErrorLogRecord.INDEX_NAME, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+        StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
                 BrowserErrorLogRecord.SERVICE_VERSION_ID,
                 BrowserErrorLogRecord.PAGE_PATH_ID,
                 BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
@@ -83,40 +83,47 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
                 query.setLimit(limit);
             }
         });
-        logs.getLogs().addAll(resp.getElements().stream().map(new BrowserErrorLogDeserializer()).collect(Collectors.toList()));
-        logs.setTotal(logs.getLogs().size());
+
+        BrowserErrorLogs logs = new BrowserErrorLogs();
+        logs.setTotal(resp.size());
+
+        for (final RowEntity rowEntity : resp.getElements()) {
+            final byte[] dataBinary =
+                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, BrowserErrorLogRecord.DATA_BINARY);
+            if (dataBinary != null && dataBinary.length > 0) {
+                BrowserErrorLog log = parserDataBinary(dataBinary);
+                logs.getLogs().add(log);
+            }
+        }
         return logs;
     }
 
-    public static class BrowserErrorLogDeserializer implements RowEntityDeserializer<BrowserErrorLog> {
-        @Override
-        public BrowserErrorLog apply(RowEntity row) {
-            // FIXME: use protobuf directly
+    /**
+     * TODO: merge the default method in the interface
+     */
+    private BrowserErrorLog parserDataBinary(
+            byte[] dataBinary) {
+        try {
             BrowserErrorLog log = new BrowserErrorLog();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            log.setService((String) searchable.get(0).getValue());
-            log.setServiceVersion((String) searchable.get(1).getValue());
-            log.setPagePath((String) searchable.get(2).getValue());
-            log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
-            log.setTime(row.getTimestamp());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            Object o = data.get(0).getValue();
-            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-                try {
-                    org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
-                            .parseFrom((ByteString) o);
-                    log.setGrade(browserErrorLog.getGrade());
-                    log.setCol(browserErrorLog.getCol());
-                    log.setLine(browserErrorLog.getLine());
-                    log.setMessage(browserErrorLog.getMessage());
-                    log.setErrorUrl(browserErrorLog.getErrorUrl());
-                    log.setStack(browserErrorLog.getStack());
-                    log.setFirstReportedError(browserErrorLog.getFirstReportedError());
-                } catch (InvalidProtocolBufferException ex) {
-                    throw new RuntimeException("fail to parse proto buffer", ex);
-                }
-            }
+            org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+                    .parseFrom(dataBinary);
+
+            log.setService(browserErrorLog.getService());
+            log.setServiceVersion(browserErrorLog.getServiceVersion());
+            log.setTime(browserErrorLog.getTime());
+            log.setPagePath(browserErrorLog.getPagePath());
+            log.setCategory(ErrorCategory.valueOf(browserErrorLog.getCategory().name().toUpperCase()));
+            log.setGrade(browserErrorLog.getGrade());
+            log.setMessage(browserErrorLog.getMessage());
+            log.setLine(browserErrorLog.getLine());
+            log.setCol(browserErrorLog.getCol());
+            log.setStack(browserErrorLog.getStack());
+            log.setErrorUrl(browserErrorLog.getErrorUrl());
+            log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+
             return log;
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
deleted file mode 100644
index 44299c38e9..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.query.PaginationUtils;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
-import org.apache.skywalking.oap.server.core.query.type.event.EventType;
-import org.apache.skywalking.oap.server.core.query.type.event.Events;
-import org.apache.skywalking.oap.server.core.query.type.event.Source;
-import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
- */
-public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO {
-    public BanyanDBEventQueryDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public Events queryEvents(EventQueryCondition condition) throws Exception {
-        StreamQueryResponse resp = query(Event.INDEX_NAME,
-                ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, Event.TYPE, Event.START_TIME, Event.END_TIME),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
-
-                        buildConditions(condition, query);
-
-                        PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
-                        query.setLimit(page.getLimit());
-                        query.setOffset(page.getFrom());
-                        switch (condition.getOrder()) {
-                            case ASC:
-                                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.ASC));
-                                break;
-                            case DES:
-                                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
-                        }
-                    }
-
-                    private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
-                        if (!Strings.isNullOrEmpty(condition.getUuid())) {
-                            query.appendCondition(eq(Event.UUID, condition.getUuid()));
-                        }
-                        final Source source = condition.getSource();
-                        if (source != null) {
-                            if (!Strings.isNullOrEmpty(source.getService())) {
-                                query.appendCondition(eq(Event.SERVICE, source.getService()));
-                            }
-                            if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
-                                query.appendCondition(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
-                            }
-                            if (!Strings.isNullOrEmpty(source.getEndpoint())) {
-                                query.appendCondition(eq(Event.ENDPOINT, source.getEndpoint()));
-                            }
-                        }
-                        if (!Strings.isNullOrEmpty(condition.getName())) {
-                            query.appendCondition(eq(Event.NAME, condition.getName()));
-                        }
-                        if (condition.getType() != null) {
-                            query.appendCondition(eq(Event.TYPE, condition.getType().name()));
-                        }
-                        final Duration time = condition.getTime();
-                        if (time != null) {
-                            if (time.getStartTimestamp() > 0) {
-                                query.appendCondition(gte(Event.START_TIME, time.getStartTimestamp()));
-                            }
-                            if (time.getEndTimestamp() > 0) {
-                                query.appendCondition(lte(Event.END_TIME, time.getEndTimestamp()));
-                            }
-                        }
-                    }
-                });
-
-        List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = resp.getElements().stream().map(new EventDeserializer()).collect(Collectors.toList());
-
-        Events events = new Events();
-        events.setEvents(eventList);
-        events.setTotal(eventList.size());
-        return events;
-    }
-
-    @Override
-    public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
-        Events events = new Events();
-        for (final EventQueryCondition condition : conditionList) {
-            Events subEvents = this.queryEvents(condition);
-            if (subEvents.getEvents().size() == 0) {
-                continue;
-            }
-
-            events.getEvents().addAll(subEvents.getEvents());
-            events.setTotal(events.getTotal() + subEvents.getTotal());
-        }
-
-        return events;
-    }
-
-    public static class EventDeserializer implements RowEntityDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
-        @Override
-        public org.apache.skywalking.oap.server.core.query.type.event.Event apply(RowEntity row) {
-            final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
-            // searchable
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            resultEvent.setUuid((String) searchable.get(0).getValue());
-            resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
-            resultEvent.setName((String) searchable.get(4).getValue());
-            resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
-            resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-            resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
-            // data
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            resultEvent.setMessage((String) data.get(0).getValue());
-            resultEvent.setParameters((String) data.get(1).getValue());
-            return resultEvent;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 6bce5767e7..5cef08a726 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -19,21 +19,20 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
 import org.apache.skywalking.apm.network.logging.v3.LogTags;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
 import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.ContentType;
 import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.query.type.Log;
 import org.apache.skywalking.oap.server.core.query.type.Logs;
@@ -41,17 +40,20 @@ import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
  */
 public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
+    private final StreamMetadata logRecordMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(LogRecord.INDEX_NAME);
+
     public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -90,9 +92,8 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
 
                 if (CollectionUtils.isNotEmpty(tags)) {
                     for (final Tag tag : tags) {
-                        if (LogRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                            query.appendCondition(eq(tag.getKey(), tag.getValue()));
-                        }
+                        // TODO: check log indexed tags
+                        query.appendCondition(eq(tag.getKey(), tag.getValue()));
                     }
                 }
             }
@@ -103,45 +104,51 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
             tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
         }
 
-        StreamQueryResponse resp = query(LogRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(logRecordMetadata,
                 ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
                         AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
                         AbstractLogRecord.SPAN_ID), tsRange, query);
 
-        List<Log> logEntities = resp.getElements().stream().map(new LogDeserializer()).collect(Collectors.toList());
-
         Logs logs = new Logs();
-        logs.getLogs().addAll(logEntities);
-        logs.setTotal(logEntities.size());
+        logs.setTotal(resp.size());
 
+        for (final RowEntity rowEntity : resp.getElements()) {
+            Log log = new Log();
+            log.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_ID));
+            log.setServiceInstanceId(
+                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_INSTANCE_ID));
+            log.setEndpointId(
+                    rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.ENDPOINT_ID));
+            if (log.getEndpointId() != null) {
+                log.setEndpointName(
+                        IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
+            }
+            log.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.TRACE_ID));
+            log.setTimestamp(((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
+                    AbstractLogRecord.TIMESTAMP)).longValue());
+            log.setContentType(ContentType.instanceOf(
+                    ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA,
+                            AbstractLogRecord.CONTENT_TYPE)).intValue()));
+            log.setContent(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.CONTENT));
+            byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, AbstractLogRecord.TAGS_RAW_DATA);
+            if (dataBinary != null && dataBinary.length > 0) {
+                parserDataBinary(dataBinary, log.getTags());
+            }
+            logs.getLogs().add(log);
+        }
         return logs;
     }
 
-    public static class LogDeserializer implements RowEntityDeserializer<Log> {
-        @Override
-        public Log apply(RowEntity row) {
-            Log log = new Log();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            log.setServiceId((String) searchable.get(0).getValue());
-            log.setServiceInstanceId((String) searchable.get(1).getValue());
-            log.setEndpointId((String) searchable.get(2).getValue());
-            log.setTraceId((String) searchable.get(3).getValue());
-            log.setTimestamp(row.getTimestamp());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
-                log.setContent("");
-            } else {
-                try {
-                    // Don't read the tags as they have been in the data binary already.
-                    LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
-                    for (final KeyStringValuePair pair : logTags.getDataList()) {
-                        log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
-                    }
-                } catch (InvalidProtocolBufferException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-            return log;
+    /**
+     * Parser the raw tags.
+     * TODO: merge default method
+     */
+    private void parserDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+        try {
+            LogTags logTags = LogTags.parseFrom(dataBinary);
+            logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+        } catch (InvalidProtocolBufferException e) {
+            throw new RuntimeException(e);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
deleted file mode 100644
index b719090919..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.Collections;
-
-/**
- * UITemplate insertion DAO
- *
- * @param <T> The only ManagementData we have now is {@link UITemplate}
- */
-public class BanyanDBManagementDAO<T extends ManagementData> extends AbstractBanyanDBDAO implements IManagementDAO {
-    private final BanyanDBStorageDataBuilder<T> storageBuilder;
-
-    public BanyanDBManagementDAO(BanyanDBStorageClient client, BanyanDBStorageDataBuilder<T> storageBuilder) {
-        super(client);
-        this.storageBuilder = storageBuilder;
-    }
-
-    @Override
-    public void insert(Model model, ManagementData storageData) throws IOException {
-        // ensure only insert once
-        StreamQueryResponse resp = query(UITemplate.INDEX_NAME,
-                Collections.singletonList(UITemplate.NAME),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.appendCondition(eq(UITemplate.NAME, storageData.id()));
-                    }
-                });
-
-        if (resp != null && resp.getElements().size() > 0) {
-            return;
-        }
-
-        StreamWrite.StreamWriteBuilder streamWrite = this.storageBuilder
-                .entity2Storage((T) storageData)
-                .name(model.getName())
-                .timestamp(Instant.now().toEpochMilli());
-        getClient().write(streamWrite.build());
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
deleted file mode 100644
index 85ea280b72..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.JsonElement;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.enumeration.Language;
-import org.apache.skywalking.oap.server.core.query.type.Attribute;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic}
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
- * are all streams.
- */
-public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO {
-    public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public List<Service> listServices(final String layer, final String group) throws IOException {
-        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
-                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.SERVICE_ID, ServiceTraffic.GROUP, ServiceTraffic.LAYER),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(ServiceTraffic.SHORT_NAME));
-                        if (StringUtil.isNotEmpty(layer)) {
-                            query.appendCondition(eq(ServiceTraffic.LAYER, Layer.valueOf(layer).value()));
-                        }
-                        if (StringUtil.isNotEmpty(group)) {
-                            query.appendCondition(eq(ServiceTraffic.GROUP, group));
-                        }
-                    }
-                });
-
-        return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<Service> getServices(String serviceId) throws IOException {
-        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
-                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.SERVICE_ID, ServiceTraffic.GROUP, ServiceTraffic.LAYER),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList(ServiceTraffic.SHORT_NAME));
-
-                        query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId));
-                    }
-                });
-
-        return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
-        StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
-                ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList("data_binary"));
-
-                        final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
-                        final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
-
-                        query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
-                        query.appendCondition(lte(InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
-                        query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
-                    }
-                });
-
-        return resp.getElements().stream().map(new ServiceInstanceDeserializer()).collect(Collectors.toList());
-    }
-
-    @Override
-    public ServiceInstance getInstance(String instanceId) throws IOException {
-        StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
-                ImmutableList.of(StreamMetaInfo.ID), new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList("data_binary"));
-
-                        query.appendCondition(eq(StreamMetaInfo.ID, instanceId));
-                    }
-                });
-
-        return resp.getElements().stream().map(new ServiceInstanceDeserializer()).findFirst().orElse(null);
-    }
-
-    @Override
-    public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
-        StreamQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
-                ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID), new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
-                    }
-                });
-
-        return resp.getElements().stream().map(new EndpointDeserializer()).filter(e -> e.getName().contains(keyword))
-                .limit(limit).collect(Collectors.toList());
-    }
-
-    public static class EndpointDeserializer implements RowEntityDeserializer<Endpoint> {
-        @Override
-        public Endpoint apply(RowEntity row) {
-            Endpoint endpoint = new Endpoint();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            endpoint.setId(row.getId());
-            endpoint.setName((String) searchable.get(0).getValue()); // 0 - name
-            return endpoint;
-        }
-    }
-
-    public static class ServiceDeserializer implements RowEntityDeserializer<Service> {
-        @Override
-        public Service apply(RowEntity row) {
-            Service service = new Service();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            service.setName((String) searchable.get(0).getValue());
-            service.setId((String) searchable.get(1).getValue());
-            service.setGroup((String) searchable.get(2).getValue());
-            String layerName = Layer.valueOf(((Number) searchable.get(3).getValue()).intValue()).name();
-            service.getLayers().add(layerName);
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            service.setShortName((String) data.get(0).getValue());
-            return service;
-        }
-    }
-
-    public static class ServiceInstanceDeserializer implements RowEntityDeserializer<ServiceInstance> {
-        @Override
-        public ServiceInstance apply(RowEntity row) {
-            InstanceTraffic instanceTraffic = new InstanceTraffic();
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            Object o = data.get(0).getValue();
-            ServiceInstance serviceInstance = new ServiceInstance();
-            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-                try {
-                    RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
-                    instanceTraffic.deserialize(remoteData);
-                    serviceInstance.setName(instanceTraffic.getName());
-                    serviceInstance.setId(row.getId());
-                    serviceInstance.setInstanceUUID(serviceInstance.getId());
-                    serviceInstance.setLayer(instanceTraffic.getLayer().name());
-
-                    if (instanceTraffic.getProperties() != null) {
-                        for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
-                            String key = property.getKey();
-                            String value = property.getValue().getAsString();
-                            if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
-                                serviceInstance.setLanguage(Language.value(value));
-                            } else {
-                                serviceInstance.getAttributes().add(new Attribute(key, value));
-                            }
-                        }
-                    } else {
-                        serviceInstance.setLanguage(Language.UNKNOWN);
-                    }
-                } catch (InvalidProtocolBufferException ex) {
-                    throw new RuntimeException("fail to parse remote data", ex);
-                }
-            } else {
-                throw new RuntimeException("unable to parse binary data");
-            }
-
-            return serviceInstance;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
deleted file mode 100644
index f1ed5b8d61..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-@RequiredArgsConstructor
-public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
-    private final BanyanDBStorageDataBuilder<T> storageBuilder;
-
-    @Override
-    public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
-        StreamWrite.StreamWriteBuilder builder = this.storageBuilder.entity2Storage((T) metrics)
-                .name(model.getName())
-                .timestamp(TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling()));
-        return new BanyanDBStreamInsertRequest(builder.build());
-    }
-
-    @Override
-    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
-        return new UpdateRequest() {
-        };
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
deleted file mode 100644
index d5006ea689..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link NetworkAddressAlias} is a stream
- */
-public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO {
-    public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
-        StreamQueryResponse resp = query(NetworkAddressAlias.INDEX_NAME,
-                ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
-                        query.appendCondition(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
-                    }
-                });
-
-        return resp.getElements().stream().map(new NetworkAddressAliasDeserializer()).collect(Collectors.toList());
-    }
-
-    public static class NetworkAddressAliasDeserializer implements RowEntityDeserializer<NetworkAddressAlias> {
-        @Override
-        public NetworkAddressAlias apply(RowEntity row) {
-            NetworkAddressAlias model = new NetworkAddressAlias();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            // searchable - last_update_time_bucket
-            model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            // data 0 - time_bucket
-            model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
-            // data 1 - address
-            model.setAddress((String) data.get(1).getValue());
-            // data 2 - represent_service_id
-            model.setRepresentServiceId((String) data.get(2).getValue());
-            // data 3 - represent_service_instance_id
-            model.setRepresentServiceInstanceId((String) data.get(3).getValue());
-            return model;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 1da585bb4b..4411957f69 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -22,22 +22,25 @@ import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
  */
 public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
+    private final StreamMetadata profileTaskLogRecord =
+            MetadataRegistry.INSTANCE.findStreamMetadata(ProfileTaskLogRecord.INDEX_NAME);
+
     private final int queryMaxSize;
 
     public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) {
@@ -47,7 +50,7 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
 
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
-        StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(profileTaskLogRecord,
                 ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID),
                 new QueryBuilder() {
                     @Override
@@ -58,26 +61,26 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
                     }
                 });
 
-        return resp.getElements().stream().map(new ProfileTaskLogDeserializer())
-                .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
-                .collect(Collectors.toList());
+        final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
+        for (final RowEntity rowEntity : resp.getElements()) {
+            tasks.add(parseTaskLog(rowEntity));
+        }
+
+        return tasks;
     }
 
-    public static class ProfileTaskLogDeserializer implements RowEntityDeserializer<ProfileTaskLog> {
-        @Override
-        public ProfileTaskLog apply(RowEntity row) {
-            ProfileTaskLog profileTaskLog = new ProfileTaskLog();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            // searchable - operation_time
-            profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
-            // searchable - instance_id
-            profileTaskLog.setInstanceId((String) searchable.get(1).getValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            // data - task_id
-            profileTaskLog.setTaskId((String) data.get(0).getValue());
-            // data - operation_type
-            profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(1).getValue()).intValue()));
-            return profileTaskLog;
-        }
+    private ProfileTaskLog parseTaskLog(RowEntity data) {
+        return ProfileTaskLog.builder()
+                .id(data.getId())
+                .taskId(data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.TASK_ID))
+                .instanceId(
+                        data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.INSTANCE_ID))
+                .operationType(ProfileTaskLogOperationType.parse(
+                        ((Number) data.getValue(StreamMetadata.TAG_FAMILY_DATA,
+                                ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+                .operationTime(
+                        ((Number) data.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
+                                ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+                .build();
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
deleted file mode 100644
index 4bf0996eb8..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
- */
-public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
-    public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
-        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
-                ImmutableList.of(StreamMetaInfo.ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
-                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT), new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        if (StringUtil.isNotEmpty(serviceId)) {
-                            query.appendCondition(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
-                        }
-
-                        if (StringUtil.isNotEmpty(endpointName)) {
-                            query.appendCondition(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
-                        }
-
-                        if (Objects.nonNull(startTimeBucket)) {
-                            query.appendCondition(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
-                        }
-
-                        if (Objects.nonNull(endTimeBucket)) {
-                            query.appendCondition(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
-                        }
-
-                        if (Objects.nonNull(limit)) {
-                            query.setLimit(limit);
-                        }
-
-//                        query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
-                    }
-                });
-
-        return resp.getElements().stream().map(new ProfileTaskDeserializer()).collect(Collectors.toList());
-    }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        if (StringUtil.isEmpty(id)) {
-            return null;
-        }
-
-        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
-                ImmutableList.of(StreamMetaInfo.ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
-                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.appendCondition(eq(StreamMetaInfo.ID, id));
-                        query.setLimit(1);
-                    }
-                });
-
-        return resp.getElements().stream().map(new ProfileTaskDeserializer()).findAny().orElse(null);
-    }
-
-    public static class ProfileTaskDeserializer implements RowEntityDeserializer<ProfileTask> {
-        @Override
-        public ProfileTask apply(RowEntity row) {
-            ProfileTask profileTask = new ProfileTask();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            profileTask.setId((String) searchable.get(0).getValue());
-            profileTask.setServiceId((String) searchable.get(1).getValue());
-            profileTask.setEndpointName((String) searchable.get(2).getValue());
-            profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
-            profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
-            profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
-            profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
-            profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
-            profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
-            return profileTask;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 6df30bb465..4c6ac7ddcd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -19,22 +19,25 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -43,13 +46,22 @@ import java.util.stream.Collectors;
  * {@link ProfileThreadSnapshotRecord} is a stream
  */
 public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO implements IProfileThreadSnapshotQueryDAO {
+    protected final ProfileThreadSnapshotRecord.Builder builder =
+            new ProfileThreadSnapshotRecord.Builder();
+
+    private final StreamMetadata profileThreadSnapshotMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(ProfileThreadSnapshotRecord.INDEX_NAME);
+
+    private final StreamMetadata segmentRecordMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+
     public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
     @Override
     public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
-        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
                 ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
                         ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
                 new QueryBuilder() {
@@ -64,22 +76,39 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
             return Collections.emptyList();
         }
 
-        List<String> segmentIDs = resp.getElements().stream()
-                .map(new ProfileThreadSnapshotRecordDeserializer())
-                .map(ProfileThreadSnapshotRecord::getSegmentId)
-                .collect(Collectors.toList());
+        final List<String> segmentIds = new LinkedList<>();
+        for (final RowEntity rowEntity : resp.getElements()) {
+            segmentIds.add(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, ProfileThreadSnapshotRecord.SEGMENT_ID));
+        }
 
         // TODO: support `IN` or `OR` logic operation in BanyanDB
         List<BasicTrace> basicTraces = new ArrayList<>();
-        for (String segmentID : segmentIDs) {
-            final StreamQueryResponse segmentRecordResp = query(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"),
+        for (String segmentID : segmentIds) {
+            final StreamQueryResponse segmentRecordResp = query(segmentRecordMetadata,
+                    ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
                     new QueryBuilder() {
                         @Override
                         public void apply(StreamQuery traceQuery) {
                             traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID));
                         }
                     });
-            basicTraces.addAll(segmentRecordResp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList()));
+
+            for (final RowEntity row : segmentRecordResp.getElements()) {
+                BasicTrace basicTrace = new BasicTrace();
+
+                basicTrace.setSegmentId(row.getId());
+                basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+                basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                        row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+                ).getEndpointName());
+                basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+                basicTrace.setError(BooleanUtils.valueToBoolean(
+                        ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+                ));
+                basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+
+                basicTraces.add(basicTrace);
+            }
         }
 
         // TODO: Sort in DB with DESC
@@ -104,7 +133,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
 
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
-        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
                 ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
                         ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
                 new QueryBuilder() {
@@ -118,26 +147,52 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
                     }
                 });
 
-        return resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
+        List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
+        for (final RowEntity rowEntity : resp.getElements()) {
+            ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
+                    new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+
+            result.add(record);
+        }
+        return result;
     }
 
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
-        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(segmentRecordMetadata,
                 ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.setDataProjections(Collections.singletonList("data_binary"));
+                        query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
                         query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
                     }
                 });
 
-        return resp.getElements().stream().map(new SegmentRecordDeserializer()).findFirst().orElse(null);
+        if (resp.size() == 0) {
+            return null;
+        }
+
+        final RowEntity rowEntity = resp.getElements().iterator().next();
+        final SegmentRecord segmentRecord = new SegmentRecord();
+        segmentRecord.setSegmentId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SEGMENT_ID));
+        segmentRecord.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+        segmentRecord.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SERVICE_ID));
+        segmentRecord.setStartTime(
+                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)).longValue());
+        segmentRecord.setLatency(
+                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+        segmentRecord.setIsError(
+                ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue());
+        byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, SegmentRecord.DATA_BINARY);
+        if (dataBinary != null && dataBinary.length > 0) {
+            segmentRecord.setDataBinary(dataBinary);
+        }
+        return segmentRecord;
     }
 
     private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
-        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
                 ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
                         ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
                 new QueryBuilder() {
@@ -151,7 +206,13 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
                     }
                 });
 
-        List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
+        List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
+        for (final RowEntity rowEntity : resp.getElements()) {
+            ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
+                    new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+
+            records.add(record);
+        }
 
         switch (aggType) {
             case MIN:
@@ -176,55 +237,4 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     enum AggType {
         MIN, MAX
     }
-
-    public static class ProfileThreadSnapshotRecordDeserializer implements RowEntityDeserializer<ProfileThreadSnapshotRecord> {
-        @Override
-        public ProfileThreadSnapshotRecord apply(RowEntity row) {
-            ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            record.setTaskId((String) searchable.get(0).getValue());
-            record.setSegmentId((String) searchable.get(1).getValue());
-            record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
-            record.setSequence(((Number) searchable.get(3).getValue()).intValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
-            return record;
-        }
-    }
-
-    public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
-        @Override
-        public SegmentRecord apply(RowEntity row) {
-            SegmentRecord record = new SegmentRecord();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            record.setSegmentId(row.getId());
-            record.setTraceId((String) searchable.get(0).getValue());
-            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
-            record.setServiceId((String) searchable.get(2).getValue());
-            record.setServiceInstanceId((String) searchable.get(3).getValue());
-            record.setEndpointId((String) searchable.get(4).getValue());
-            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
-            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
-            return record;
-        }
-    }
-
-    public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
-        @Override
-        public BasicTrace apply(RowEntity row) {
-            BasicTrace trace = new BasicTrace();
-            trace.setSegmentId(row.getId());
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            trace.getTraceIds().add((String) searchable.get(0).getValue());
-            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
-            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                    (String) searchable.get(2).getValue()
-            ).getEndpointName());
-            trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
-            trace.setStart(String.valueOf(searchable.get(4).getValue()));
-            return trace;
-        }
-    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index d028fa91c3..a1cf73edcd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -24,21 +24,34 @@ import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
 
 @RequiredArgsConstructor
-public class BanyanDBRecordDAO<T extends Record> implements IRecordDAO {
-    private final BanyanDBStorageDataBuilder<T> storageBuilder;
+public class BanyanDBRecordDAO implements IRecordDAO {
+    private final StorageBuilder<Record> storageBuilder;
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        StreamWrite.StreamWriteBuilder builder = storageBuilder.entity2Storage((T) record)
-                .name(model.getName())
-                .timestamp(TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()));
+        StreamMetadata metadata = MetadataRegistry.INSTANCE.findStreamMetadata(model.getName());
+        if (metadata == null) {
+            throw new IOException(model.getName() + " is not registered");
+        }
+        StreamWrite streamWrite = new StreamWrite(metadata.getGroup(), // group name
+                model.getName(), // index-name
+                record.id(), // identity
+                TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()), // timestamp
+                metadata.getDataFamilySize(), // length of the "data" tag family
+                metadata.getSearchableFamilySize()); // length of the "searchable" tag family
+        Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(metadata, streamWrite);
+        storageBuilder.entity2Storage(record, convert2Storage);
 
-        return new BanyanDBStreamInsertRequest(builder.build());
+        return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 3c3b4b4a6c..c3f75c8cef 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -23,24 +23,17 @@ import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBManagementDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBMetricsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBNoneStreamDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 
 @Slf4j
 public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> implements StorageDAO {
@@ -50,69 +43,21 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple
 
     @Override
     public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
-        // SKIP:
-        // 1. OAL runtime metrics builder
-        // 2. Analysis Function builder
-        if (storageBuilder.getClass().getName().startsWith("org.apache.skywalking.oap.server.core.")) {
-            log.warn("metrics builder {} is not supported yet", storageBuilder.getClass());
-            return new IMetricsDAO() {
-                @Override
-                public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
-                    return Collections.emptyList();
-                }
-
-                @Override
-                public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
-                    return new InsertRequest() {
-                    };
-                }
-
-                @Override
-                public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
-                    return new UpdateRequest() {
-                    };
-                }
-            };
-        }
-        return new BanyanDBMetricsDAO<>((BanyanDBStorageDataBuilder<Metrics>) storageBuilder);
+        return new BanyanDBMetricsDAO((StorageBuilder<Metrics>) storageBuilder);
     }
 
     @Override
     public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
-        try {
-            final Class<?> returnType = storageBuilder.getClass().getDeclaredMethod("storage2Entity", Map.class).getReturnType();
-            // FIXME: this is currently a hack to avoid TopN insertion since we will impl TopN later in BanyanDB side
-            if (TopN.class.isAssignableFrom(returnType)) {
-                return new IRecordDAO() {
-                    @Override
-                    public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-                        return new InsertRequest() {
-                        };
-                    }
-                };
-            } else if (returnType.getName().equals("org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord")) {
-                // SKIP ZipkinSpanRecord
-                return new IRecordDAO() {
-                    @Override
-                    public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-                        return new InsertRequest() {
-                        };
-                    }
-                };
-            }
-        } catch (NoSuchMethodException ex) {
-            log.error("fail to get declared method");
-        }
-        return new BanyanDBRecordDAO<>((BanyanDBStorageDataBuilder<Record>) storageBuilder);
+        return new BanyanDBRecordDAO((StorageBuilder<Record>) storageBuilder);
     }
 
     @Override
     public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
-        return new BanyanDBNoneStreamDAO<>(getClient(), (BanyanDBStorageDataBuilder<NoneStream>) storageBuilder);
+        return new BanyanDBNoneStreamDAO(getClient(), (StorageBuilder<NoneStream>) storageBuilder);
     }
 
     @Override
     public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
-        return new BanyanDBManagementDAO<>(getClient(), (BanyanDBStorageDataBuilder<ManagementData>) storageBuilder);
+        return new BanyanDBManagementDAO(getClient(), (StorageBuilder<ManagementData>) storageBuilder);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
index f8287973bc..b04e4422d5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
 @RequiredArgsConstructor
+@Getter
 public class BanyanDBStreamInsertRequest implements InsertRequest {
-    @Getter
     private final StreamWrite streamWrite;
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java
deleted file mode 100644
index 660190dd10..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import io.vavr.Tuple2;
-import io.vavr.Tuple4;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.query.type.Call;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITopologyQueryDAO {
-    public BanyanDBTopologyQueryDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
-        if (CollectionUtils.isEmpty(serviceIds)) {
-            throw new UnexpectedException("Service id is empty");
-        }
-
-        return loadServiceCalls(
-                ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-                ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.SERVER
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
-        if (CollectionUtils.isEmpty(serviceIds)) {
-            throw new UnexpectedException("Service id is empty");
-        }
-
-        return loadServiceCalls(
-                ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
-                ServiceRelationClientSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.CLIENT
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
-        return loadServiceCalls(
-                ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-                ServiceRelationServerSideMetrics.DEST_SERVICE_ID, Collections.emptyList(), DetectPoint.SERVER
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
-        return loadServiceCalls(
-                ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
-                ServiceRelationClientSideMetrics.DEST_SERVICE_ID, Collections.emptyList(), DetectPoint.CLIENT
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
-        return loadServiceInstanceCalls(
-                ServiceInstanceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
-                ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId,
-                DetectPoint.SERVER
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
-        return loadServiceInstanceCalls(
-                ServiceInstanceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
-                ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
-                ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId,
-                DetectPoint.CLIENT
-        );
-    }
-
-    @Override
-    public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
-        return loadEndpointCalls(
-                EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
-                EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
-                EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId
-        );
-    }
-
-    private List<Call.CallDetail> loadServiceCalls(String tableName,
-                                                   long startTB,
-                                                   long endTB,
-                                                   String sourceCName,
-                                                   String destCName,
-                                                   List<String> serviceIds,
-                                                   DetectPoint detectPoint) throws IOException {
-        TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-        List<Call.CallDetail> calls = new ArrayList<>();
-        if (serviceIds.isEmpty()) {
-            StreamQueryResponse resp = query(tableName, Collections.emptyList(), timeRange, new QueryBuilder() {
-                @Override
-                void apply(StreamQuery query) {
-                    // query component_id
-                    query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
-                }
-            });
-
-            calls.addAll(resp.getElements().stream().map(new ServiceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
-        } else {
-            for (String fieldOfInterest : ImmutableList.of(sourceCName, destCName)) {
-                for (String serviceID : serviceIds) {
-                    StreamQueryResponse resp = query(tableName, ImmutableList.of(fieldOfInterest), timeRange, new QueryBuilder() {
-                        @Override
-                        void apply(StreamQuery query) {
-                            // query component_id
-                            query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
-
-                            query.appendCondition(eq(fieldOfInterest, serviceID));
-                        }
-                    });
-
-                    calls.addAll(resp.getElements().stream().map(new ServiceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
-                }
-            }
-        }
-
-        return calls;
-    }
-
-    private List<Call.CallDetail> loadServiceInstanceCalls(String tableName,
-                                                           long startTB,
-                                                           long endTB,
-                                                           String sourceCName,
-                                                           String descCName,
-                                                           String sourceServiceId,
-                                                           String destServiceId,
-                                                           DetectPoint detectPoint) throws IOException {
-        TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-
-        Set<Tuple4<String, String, String, String>> productQuerySet = ImmutableSet.of(
-                new Tuple4<>(sourceCName, sourceServiceId, descCName, destServiceId),
-                new Tuple4<>(sourceCName, destServiceId, descCName, sourceServiceId)
-        );
-
-        List<Call.CallDetail> calls = new ArrayList<>();
-
-        for (Tuple4<String, String, String, String> querySet : productQuerySet) {
-            StreamQueryResponse resp = query(tableName, ImmutableList.of(querySet._1(), querySet._3()), timeRange, new QueryBuilder() {
-                @Override
-                void apply(StreamQuery query) {
-                    // query component_id
-                    query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
-
-                    query.appendCondition(eq(querySet._1(), querySet._2()));
-                    query.appendCondition(eq(querySet._3(), querySet._4()));
-                }
-            });
-
-            calls.addAll(resp.getElements().stream().map(new InstanceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
-        }
-
-        return calls;
-    }
-
-    private List<Call.CallDetail> loadEndpointCalls(String tableName,
-                                                    long startTB,
-                                                    long endTB,
-                                                    String sourceCName,
-                                                    String destCName,
-                                                    String id) throws IOException {
-        TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-
-        Set<Tuple2<String, String>> allPossibleQuerySet = ImmutableSet.of(
-                new Tuple2<>(sourceCName, id),
-                new Tuple2<>(destCName, id)
-        );
-
-        List<Call.CallDetail> calls = new ArrayList<>();
-
-        for (Tuple2<String, String> querySet : allPossibleQuerySet) {
-            StreamQueryResponse resp = query(tableName, ImmutableList.of(querySet._1()), timeRange, new QueryBuilder() {
-                @Override
-                void apply(StreamQuery query) {
-                    query.appendCondition(eq(querySet._1(), querySet._2()));
-                }
-            });
-
-            calls.addAll(resp.getElements().stream().map(new EndpointCallDetailDeserializer()).collect(Collectors.toList()));
-        }
-
-        return calls;
-    }
-
-    @RequiredArgsConstructor
-    public static class ServiceCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
-        private final DetectPoint detectPoint;
-
-        @Override
-        public Call.CallDetail apply(RowEntity rowEntity) {
-            Call.CallDetail call = new Call.CallDetail();
-            String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
-            String entityId = idsSlice[1];
-            int componentId = ((Number) rowEntity.getTagFamilies().get(1) // Tag Family: "data"
-                    .get(0).getValue()).intValue();
-            call.buildFromServiceRelation(entityId, componentId, this.detectPoint);
-            return call;
-        }
-    }
-
-    @RequiredArgsConstructor
-    public static class InstanceCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
-        private final DetectPoint detectPoint;
-
-        @Override
-        public Call.CallDetail apply(RowEntity rowEntity) {
-            Call.CallDetail call = new Call.CallDetail();
-            String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
-            String entityId = idsSlice[1];
-            int componentId = ((Number) rowEntity.getTagFamilies().get(1) // Tag Family: "data"
-                    .get(0).getValue()).intValue();
-            call.buildFromInstanceRelation(entityId, componentId, this.detectPoint);
-            return call;
-        }
-    }
-
-    @RequiredArgsConstructor
-    public static class EndpointCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
-        @Override
-        public Call.CallDetail apply(RowEntity rowEntity) {
-            Call.CallDetail call = new Call.CallDetail();
-            String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
-            String entityId = idsSlice[1];
-            call.buildFromEndpointRelation(entityId, DetectPoint.SERVER);
-            return call;
-        }
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 642ed519fc..fc0d458c29 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -20,11 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -38,15 +36,20 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
 import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
+    private final StreamMetadata segmentRecordMetadata =
+            MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+
     public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -97,9 +100,8 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
 
                 if (CollectionUtils.isNotEmpty(tags)) {
                     for (final Tag tag : tags) {
-                        if (SegmentRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                            query.appendCondition(eq(tag.getKey(), tag.getValue()));
-                        }
+                        // TODO: check if we have this tag indexed?
+                        query.appendCondition(eq(tag.getKey(), tag.getValue()));
                     }
                 }
 
@@ -114,7 +116,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
             tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
         }
 
-        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+        StreamQueryResponse resp = query(segmentRecordMetadata,
                 ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
                         SegmentRecord.IS_ERROR, // 1 - is_error
                         SegmentRecord.SERVICE_ID, // 2 - service_id
@@ -124,24 +126,39 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
                         SegmentRecord.START_TIME), // 6 - start_time
                 tsRange, q);
 
-        List<BasicTrace> basicTraces = resp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList());
+        TraceBrief traceBrief = new TraceBrief();
+        traceBrief.setTotal(resp.getElements().size());
+
+        for (final RowEntity row : resp.getElements()) {
+            BasicTrace basicTrace = new BasicTrace();
+
+            basicTrace.setSegmentId(row.getId());
+            basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+            basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                    row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+            ).getEndpointName());
+            basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+            basicTrace.setError(BooleanUtils.valueToBoolean(
+                    ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+            ));
+            basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+
+            traceBrief.getTraces().add(basicTrace);
+        }
 
-        TraceBrief brief = new TraceBrief();
-        brief.setTotal(basicTraces.size());
-        brief.getTraces().addAll(basicTraces);
-        return brief;
+        return traceBrief;
     }
 
     @Override
     public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
-        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
-                ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
-                        SegmentRecord.IS_ERROR, // 1 - is_error
-                        SegmentRecord.SERVICE_ID, // 2 - service_id
-                        SegmentRecord.SERVICE_INSTANCE_ID, // 3 - service_instance_id
-                        SegmentRecord.ENDPOINT_ID, // 4 - endpoint_id
-                        SegmentRecord.LATENCY, // 5 - latency
-                        SegmentRecord.START_TIME), // 6 - start_time
+        StreamQueryResponse resp = query(segmentRecordMetadata,
+                ImmutableList.of(SegmentRecord.TRACE_ID,
+                        SegmentRecord.IS_ERROR,
+                        SegmentRecord.SERVICE_ID,
+                        SegmentRecord.SERVICE_INSTANCE_ID,
+                        SegmentRecord.ENDPOINT_ID,
+                        SegmentRecord.LATENCY,
+                        SegmentRecord.START_TIME),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
@@ -150,47 +167,19 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
                     }
                 });
 
-        return resp.getElements().stream().map(new SegmentRecordDeserializer()).collect(Collectors.toList());
+        List<SegmentRecord> segmentRecords = new ArrayList<>(resp.getElements().size());
+
+        for (final RowEntity rowEntity : resp.getElements()) {
+            SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
+                    new BanyanDBConverter.StreamToEntity(segmentRecordMetadata, rowEntity));
+            segmentRecords.add(segmentRecord);
+        }
+
+        return segmentRecords;
     }
 
     @Override
     public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
         return Collections.emptyList();
     }
-
-    public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
-        @Override
-        public BasicTrace apply(RowEntity row) {
-            BasicTrace trace = new BasicTrace();
-            trace.setSegmentId(row.getId());
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            trace.getTraceIds().add((String) searchable.get(0).getValue());
-            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
-            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                    (String) searchable.get(4).getValue()
-            ).getEndpointName());
-            trace.setDuration(((Long) searchable.get(5).getValue()).intValue());
-            trace.setStart(String.valueOf(searchable.get(6).getValue()));
-            return trace;
-        }
-    }
-
-    public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
-        @Override
-        public SegmentRecord apply(RowEntity row) {
-            SegmentRecord record = new SegmentRecord();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            record.setSegmentId(row.getId());
-            record.setTraceId((String) searchable.get(0).getValue());
-            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
-            record.setServiceId((String) searchable.get(2).getValue());
-            record.setServiceInstanceId((String) searchable.get(3).getValue());
-            record.setEndpointId((String) searchable.get(4).getValue());
-            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
-            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
-            return record;
-        }
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
deleted file mode 100644
index 7888b0f56e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.Tag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
-import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
-import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
- */
-public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO implements UITemplateManagementDAO {
-    public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
-        super(client);
-    }
-
-    @Override
-    public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
-        StreamQueryResponse resp = query(UITemplate.INDEX_NAME, ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
-                new QueryBuilder() {
-                    @Override
-                    public void apply(StreamQuery query) {
-                        query.setDataProjections(ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
-                        query.setLimit(10000);
-                        if (!includingDisabled) {
-                            query.appendCondition(eq(UITemplate.DISABLED, BooleanUtils.FALSE));
-                        }
-                    }
-                });
-
-        return resp.getElements().stream().map(new DashboardConfigurationDeserializer()).collect(Collectors.toList());
-    }
-
-    @Override
-    public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
-        final UITemplate uiTemplate = setting.toEntity();
-
-        StreamWrite request = StreamWrite.builder()
-                .name(UITemplate.INDEX_NAME)
-                // searchable - name
-                .searchableTag(Tag.stringField(uiTemplate.getName()))
-                // searchable - disabled
-                .searchableTag(Tag.longField(uiTemplate.getDisabled()))
-                // data - type
-                .dataTag(Tag.stringField(uiTemplate.getType()))
-                // data - configuration
-                .dataTag(Tag.stringField(uiTemplate.getConfiguration()))
-                // data - activated
-                .dataTag(Tag.longField(uiTemplate.getActivated()))
-                .timestamp(Instant.now().toEpochMilli())
-                .elementId(uiTemplate.id())
-                .build();
-        getClient().write(request);
-        return TemplateChangeStatus.builder().status(true).build();
-    }
-
-    @Override
-    public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
-        return TemplateChangeStatus.builder().status(false).message("Can't update the template").build();
-    }
-
-    @Override
-    public TemplateChangeStatus disableTemplate(String name) throws IOException {
-        return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
-    }
-
-    public static class DashboardConfigurationDeserializer implements RowEntityDeserializer<DashboardConfiguration> {
-        @Override
-        public DashboardConfiguration apply(RowEntity row) {
-            DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
-            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-            // name
-            dashboardConfiguration.setName((String) searchable.get(0).getValue());
-            // disabled
-            dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
-            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-            // activated
-            dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
-            // configuration
-            dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
-            // type
-            dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
-            return dashboardConfiguration;
-        }
-    }
-}