You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:28 UTC

[skywalking] 09/24: refactor deserializer

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a3bba672ec56a8b1bd31a2ff9928d4c1b011d214
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Wed Dec 8 16:45:12 2021 +0800

    refactor deserializer
---
 .../banyandb/BanyanDBStorageBuilderFactory.java    |  70 +++++++
 .../plugin/banyandb/BanyanDBStorageProvider.java   |   2 +-
 .../deserializer/AbstractBanyanDBDeserializer.java |  53 -----
 .../deserializer/AlarmMessageDeserializer.java     |  69 -------
 .../deserializer/BanyanDBDeserializerFactory.java  |  72 -------
 .../deserializer/BasicTraceDeserializer.java       |  49 -----
 .../deserializer/BrowserErrorLogDeserializer.java  |  72 -------
 .../DashboardConfigurationDeserializer.java        |  55 ------
 .../deserializer/DatabaseDeserializer.java         |  43 -----
 .../deserializer/EndpointDeserializer.java         |  43 -----
 .../banyandb/deserializer/EventDeserializer.java   |  55 ------
 .../banyandb/deserializer/LogDeserializer.java     |  69 -------
 .../NetworkAddressAliasDeserializer.java           |  53 -----
 .../deserializer/ProfileTaskDeserializer.java      |  54 ------
 .../deserializer/ProfileTaskLogDeserializer.java   |  53 -----
 .../ProfileThreadSnapshotRecordDeserializer.java   |  50 -----
 .../deserializer/RowEntityDeserializer.java        |  26 ---
 .../deserializer/SegmentRecordDeserializer.java    |  53 -----
 .../banyandb/deserializer/ServiceDeserializer.java |  44 -----
 .../deserializer/ServiceInstanceDeserializer.java  |  79 --------
 .../plugin/banyandb/schema/AlarmRecordBuilder.java |   2 +-
 .../banyandb/schema/BanyanDBMetricsBuilder.java    |  30 ---
 .../banyandb/schema/BanyanDBRecordBuilder.java     |  61 ------
 .../schema/BanyanDBStorageDataBuilder.java         |  56 +++++-
 .../schema/BrowserErrorLogRecordBuilder.java       |   2 +-
 .../plugin/banyandb/schema/EventBuilder.java       |   9 +-
 .../plugin/banyandb/schema/LogRecordBuilder.java   |   2 +-
 .../storage/plugin/banyandb/schema/Metadata.java   |   6 +-
 .../schema/NetworkAddressAliasBuilder.java         |   2 +-
 .../schema/ProfileTaskLogRecordBuilder.java        |   2 +-
 .../banyandb/schema/ProfileTaskRecordBuilder.java  |   2 +-
 .../schema/ProfileThreadSnapshotRecordBuilder.java |   2 +-
 .../banyandb/schema/SegmentRecordBuilder.java      |   2 +-
 .../plugin/banyandb/schema/UITemplateBuilder.java  |   8 -
 .../banyandb/stream/AbstractBanyanDBDAO.java       |  50 +++--
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |  88 +++++++--
 .../stream/BanyanDBBrowserLogQueryDAO.java         |  76 ++++++--
 .../banyandb/stream/BanyanDBEventQueryDAO.java     |  94 ++++++---
 .../banyandb/stream/BanyanDBLogQueryDAO.java       |  77 ++++++--
 .../banyandb/stream/BanyanDBManagementDAO.java     |  42 +++-
 .../banyandb/stream/BanyanDBMetadataQueryDAO.java  | 215 ++++++++++++++++-----
 .../plugin/banyandb/stream/BanyanDBMetricsDAO.java |  14 +-
 .../stream/BanyanDBNetworkAddressAliasDAO.java     |  44 ++++-
 .../banyandb/stream/BanyanDBNoneStreamDAO.java     |  18 +-
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  44 ++++-
 .../stream/BanyanDBProfileTaskQueryDAO.java        | 108 +++++++----
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     | 166 ++++++++++++----
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |  11 +-
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |  97 +---------
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     |  96 ++++++---
 .../stream/BanyanDBUITemplateManagementDAO.java    |  52 +++--
 51 files changed, 1036 insertions(+), 1506 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
new file mode 100644
index 0000000000..a07fe1f106
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
@@ -0,0 +1,70 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
+
+public class BanyanDBStorageBuilderFactory implements StorageBuilderFactory {
+    @Override
+    public BuilderTemplateDefinition builderTemplate() {
+        return new BuilderTemplateDefinition(StorageHashMapBuilder.class.getName(), "metrics-builder");
+    }
+
+    @Override
+    public Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType, Class<? extends StorageBuilder> defaultBuilder) {
+        if (SegmentRecord.class.equals(dataType)) {
+            return SegmentRecordBuilder.class;
+        } else if (AlarmRecord.class.equals(dataType)) {
+            return AlarmRecordBuilder.class;
+        } else if (BrowserErrorLogRecord.class.equals(dataType)) {
+            return BrowserErrorLogRecordBuilder.class;
+        } else if (LogRecord.class.equals(dataType)) {
+            return LogRecordBuilder.class;
+        } else if (ProfileTaskLogRecord.class.equals(dataType)) {
+            return ProfileTaskLogRecordBuilder.class;
+        } else if (ProfileThreadSnapshotRecord.class.equals(dataType)) {
+            return ProfileThreadSnapshotRecordBuilder.class;
+        } else if (ProfileTaskRecord.class.equals(dataType)) {
+            return ProfileTaskRecordBuilder.class;
+        } else if (UITemplate.class.equals(dataType)) {
+            return UITemplateBuilder.class;
+        } else if (Event.class.equals(dataType)) {
+            return EventBuilder.class;
+        } else if (ServiceTraffic.class.equals(dataType)) {
+            return Metadata.ServiceTrafficBuilder.class;
+        } else if (InstanceTraffic.class.equals(dataType)) {
+            return Metadata.InstanceTrafficBuilder.class;
+        } else if (EndpointTraffic.class.equals(dataType)) {
+            return Metadata.EndpointTrafficBuilder.class;
+        } else if (NetworkAddressAlias.class.equals(dataType)) {
+            return NetworkAddressAliasBuilder.class;
+        }
+
+        throw new UnsupportedOperationException("unsupported storage type");
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 1df391d302..aade57b62b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -83,7 +83,7 @@ public class BanyanDBStorageProvider extends ModuleProvider {
 
     @Override
     public void prepare() throws ServiceNotProvidedException, ModuleStartException {
-        this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+        this.registerServiceImplementation(StorageBuilderFactory.class, new BanyanDBStorageBuilderFactory());
 
         this.client = new BanyanDBStorageClient(config.getHost(), config.getPort(), config.getGroup());
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
deleted file mode 100644
index 0b2f0644ad..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
-
-import java.util.Collections;
-import java.util.List;
-
-public abstract class AbstractBanyanDBDeserializer<T> implements RowEntityDeserializer<T> {
-    private final String indexName;
-    private final List<String> searchableProjection;
-    private final List<String> dataProjection;
-
-    protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection) {
-        this(indexName, searchableProjection, Collections.emptyList());
-    }
-
-    protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection, List<String> dataProjection) {
-        this.indexName = indexName;
-        this.searchableProjection = searchableProjection;
-        this.dataProjection = dataProjection;
-    }
-
-    public StreamQuery buildStreamQuery() {
-        final StreamQuery query = new StreamQuery(this.indexName, this.searchableProjection);
-        query.setDataProjections(this.dataProjection);
-        return query;
-    }
-
-    public StreamQuery buildStreamQuery(long startTimestamp, long endTimestamp) {
-        final StreamQuery query = new StreamQuery(this.indexName, new TimestampRange(startTimestamp, endTimestamp), this.searchableProjection);
-        query.setDataProjections(this.dataProjection);
-        return query;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java
deleted file mode 100644
index 2df0ef739b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
-import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-
-import java.util.List;
-
-public class AlarmMessageDeserializer extends AbstractBanyanDBDeserializer<AlarmMessage> {
-    private static final Gson GSON = new Gson();
-
-    public AlarmMessageDeserializer() {
-        super(AlarmRecord.INDEX_NAME,
-                ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
-                ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
-    }
-
-    @Override
-    public AlarmMessage map(RowEntity row) {
-        AlarmMessage alarmMessage = new AlarmMessage();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        int scopeID = ((Number) searchable.get(0).getValue()).intValue();
-        alarmMessage.setScopeId(scopeID);
-        alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
-        alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        alarmMessage.setId((String) data.get(0).getValue());
-        alarmMessage.setId1((String) data.get(1).getValue());
-        alarmMessage.setMessage((String) data.get(2).getValue());
-        Object o = data.get(3).getValue();
-        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-            this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
-        }
-        return alarmMessage;
-    }
-
-    void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
-        List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
-        }.getType());
-        tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
deleted file mode 100644
index 859812530e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.core.query.type.Database;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-import org.apache.skywalking.oap.server.core.query.type.Log;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.query.type.event.Event;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public enum BanyanDBDeserializerFactory {
-    INSTANCE;
-
-    private final Map<Class<?>, AbstractBanyanDBDeserializer<?>> registry;
-
-    BanyanDBDeserializerFactory() {
-        registry = new HashMap<>(10);
-        register(AlarmMessage.class, new AlarmMessageDeserializer());
-        register(BasicTrace.class, new BasicTraceDeserializer());
-        register(BrowserErrorLog.class, new BrowserErrorLogDeserializer());
-        register(DashboardConfiguration.class, new DashboardConfigurationDeserializer());
-        register(Database.class, new DatabaseDeserializer());
-        register(Endpoint.class, new EndpointDeserializer());
-        register(Event.class, new EventDeserializer());
-        register(Log.class, new LogDeserializer());
-        register(NetworkAddressAlias.class, new NetworkAddressAliasDeserializer());
-        register(ProfileTaskLog.class, new ProfileTaskLogDeserializer());
-        register(ProfileTask.class, new ProfileTaskDeserializer());
-        register(ProfileThreadSnapshotRecord.class, new ProfileThreadSnapshotRecordDeserializer());
-        register(SegmentRecord.class, new SegmentRecordDeserializer());
-        register(ServiceInstance.class, new ServiceInstanceDeserializer());
-        register(Service.class, new ServiceDeserializer());
-    }
-
-    private <T> void register(Class<T> clazz, AbstractBanyanDBDeserializer<T> mapper) {
-        this.registry.put(clazz, mapper);
-    }
-
-    @SuppressWarnings({"unchecked"})
-    public <T> AbstractBanyanDBDeserializer<T> findDeserializer(Class<T> clazz) {
-        return (AbstractBanyanDBDeserializer<T>) registry.get(clazz);
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java
deleted file mode 100644
index 949ae43e16..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-
-import java.util.List;
-
-public class BasicTraceDeserializer extends AbstractBanyanDBDeserializer<BasicTrace> {
-    public BasicTraceDeserializer() {
-        super(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"));
-    }
-
-    @Override
-    public BasicTrace map(RowEntity row) {
-        BasicTrace trace = new BasicTrace();
-        trace.setSegmentId(row.getId());
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        trace.getTraceIds().add((String) searchable.get(0).getValue());
-        trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
-        trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                (String) searchable.get(2).getValue()
-        ).getEndpointName());
-        trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
-        trace.setStart(String.valueOf(searchable.get(4).getValue()));
-        return trace;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java
deleted file mode 100644
index 92605afa67..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
-import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class BrowserErrorLogDeserializer extends AbstractBanyanDBDeserializer<BrowserErrorLog> {
-    public BrowserErrorLogDeserializer() {
-        super(BrowserErrorLogRecord.INDEX_NAME,
-                ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
-                        BrowserErrorLogRecord.SERVICE_VERSION_ID,
-                        BrowserErrorLogRecord.PAGE_PATH_ID,
-                        BrowserErrorLogRecord.ERROR_CATEGORY),
-                Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
-    }
-
-    @Override
-    public BrowserErrorLog map(RowEntity row) {
-        // FIXME: use protobuf directly
-        BrowserErrorLog log = new BrowserErrorLog();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        log.setService((String) searchable.get(0).getValue());
-        log.setServiceVersion((String) searchable.get(1).getValue());
-        log.setPagePath((String) searchable.get(2).getValue());
-        log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
-        log.setTime(row.getTimestamp());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        Object o = data.get(0).getValue();
-        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-            try {
-                org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
-                        .parseFrom((ByteString) o);
-                log.setGrade(browserErrorLog.getGrade());
-                log.setCol(browserErrorLog.getCol());
-                log.setLine(browserErrorLog.getLine());
-                log.setMessage(browserErrorLog.getMessage());
-                log.setErrorUrl(browserErrorLog.getErrorUrl());
-                log.setStack(browserErrorLog.getStack());
-                log.setFirstReportedError(browserErrorLog.getFirstReportedError());
-            } catch (InvalidProtocolBufferException ex) {
-                throw new RuntimeException("fail to parse proto buffer", ex);
-            }
-        }
-        return log;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java
deleted file mode 100644
index c46db9df8d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-
-import java.util.List;
-
-public class DashboardConfigurationDeserializer extends AbstractBanyanDBDeserializer<DashboardConfiguration> {
-    public DashboardConfigurationDeserializer() {
-        super(UITemplate.INDEX_NAME,
-                ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
-                ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
-    }
-
-    @Override
-    public DashboardConfiguration map(RowEntity row) {
-        DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        // name
-        dashboardConfiguration.setName((String) searchable.get(0).getValue());
-        // disabled
-        dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        // activated
-        dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
-        // configuration
-        dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
-        // type
-        dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
-        return dashboardConfiguration;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java
deleted file mode 100644
index db6d875358..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Database;
-
-import java.util.List;
-
-public class DatabaseDeserializer extends AbstractBanyanDBDeserializer<Database> {
-    public DatabaseDeserializer() {
-        super(ServiceTraffic.INDEX_NAME,
-                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE));
-    }
-
-    @Override
-    public Database map(RowEntity row) {
-        Database database = new Database();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        database.setId(row.getId());
-        database.setName((String) searchable.get(0).getValue());
-        return database;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java
deleted file mode 100644
index e86867c771..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-
-import java.util.List;
-
-public class EndpointDeserializer extends AbstractBanyanDBDeserializer<Endpoint> {
-    public EndpointDeserializer() {
-        super(EndpointTraffic.INDEX_NAME,
-                ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID));
-    }
-
-    @Override
-    public Endpoint map(RowEntity row) {
-        Endpoint endpoint = new Endpoint();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        endpoint.setName((String) searchable.get(0).getValue());
-        endpoint.setId((String) searchable.get(1).getValue());
-        return endpoint;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java
deleted file mode 100644
index 39783228d2..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.query.type.event.EventType;
-import org.apache.skywalking.oap.server.core.query.type.event.Source;
-import org.apache.skywalking.oap.server.core.source.Event;
-
-import java.util.List;
-
-public class EventDeserializer extends AbstractBanyanDBDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
-    public EventDeserializer() {
-        super(Event.INDEX_NAME,
-                ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME,
-                        Event.TYPE, Event.START_TIME, Event.END_TIME),
-                ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
-    }
-
-    @Override
-    public org.apache.skywalking.oap.server.core.query.type.event.Event map(RowEntity row) {
-        final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
-        // searchable
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        resultEvent.setUuid((String) searchable.get(0).getValue());
-        resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
-        resultEvent.setName((String) searchable.get(4).getValue());
-        resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
-        resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-        resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
-        // data
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        resultEvent.setMessage((String) data.get(0).getValue());
-        resultEvent.setParameters((String) data.get(1).getValue());
-        return resultEvent;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java
deleted file mode 100644
index f1bb3bec5d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
-import org.apache.skywalking.apm.network.logging.v3.LogTags;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-import org.apache.skywalking.oap.server.core.query.type.Log;
-
-import java.util.List;
-
-public class LogDeserializer extends AbstractBanyanDBDeserializer<Log> {
-    public LogDeserializer() {
-        super(LogRecord.INDEX_NAME, ImmutableList.of(
-                        AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
-                        AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
-                        AbstractLogRecord.SPAN_ID),
-                ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
-    }
-
-    @Override
-    public Log map(RowEntity row) {
-        Log log = new Log();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        log.setServiceId((String) searchable.get(0).getValue());
-        log.setServiceInstanceId((String) searchable.get(1).getValue());
-        log.setEndpointId((String) searchable.get(2).getValue());
-        log.setTraceId((String) searchable.get(3).getValue());
-        log.setTimestamp(row.getTimestamp());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
-            log.setContent("");
-        } else {
-            try {
-                // Don't read the tags as they have been in the data binary already.
-                LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
-                for (final KeyStringValuePair pair : logTags.getDataList()) {
-                    log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
-                }
-            } catch (InvalidProtocolBufferException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        return log;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java
deleted file mode 100644
index 6255a0ea40..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-
-import java.util.List;
-
-public class NetworkAddressAliasDeserializer extends AbstractBanyanDBDeserializer<NetworkAddressAlias> {
-    public NetworkAddressAliasDeserializer() {
-        super(NetworkAddressAlias.INDEX_NAME,
-                ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
-                ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
-    }
-
-    @Override
-    public NetworkAddressAlias map(RowEntity row) {
-        NetworkAddressAlias model = new NetworkAddressAlias();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        // searchable - last_update_time_bucket
-        model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        // data - time_bucket
-        model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
-        // data - address
-        model.setAddress((String) data.get(1).getValue());
-        // data - represent_service_id
-        model.setRepresentServiceId((String) data.get(2).getValue());
-        // data - represent_service_instance_id
-        model.setRepresentServiceInstanceId((String) data.get(3).getValue());
-        return model;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java
deleted file mode 100644
index d3eddedec6..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-
-import java.util.List;
-
-public class ProfileTaskDeserializer extends AbstractBanyanDBDeserializer<ProfileTask> {
-    public static final String ID = "profile_task_query_id";
-
-    public ProfileTaskDeserializer() {
-        super(ProfileTaskRecord.INDEX_NAME,
-                ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
-                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT));
-    }
-
-    @Override
-    public ProfileTask map(RowEntity row) {
-        ProfileTask profileTask = new ProfileTask();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        profileTask.setId((String) searchable.get(0).getValue());
-        profileTask.setServiceId((String) searchable.get(1).getValue());
-        profileTask.setEndpointName((String) searchable.get(2).getValue());
-        profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
-        profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
-        profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
-        profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
-        profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
-        profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
-        return null;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java
deleted file mode 100644
index 79731bf5fb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
-
-import java.util.List;
-
-public class ProfileTaskLogDeserializer extends AbstractBanyanDBDeserializer<ProfileTaskLog> {
-    public ProfileTaskLogDeserializer() {
-        super(ProfileTaskLogRecord.INDEX_NAME,
-                ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
-                ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
-                        ProfileTaskLogRecord.OPERATION_TYPE));
-    }
-
-    @Override
-    public ProfileTaskLog map(RowEntity row) {
-        ProfileTaskLog profileTaskLog = new ProfileTaskLog();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        // searchable - operation_time
-        profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        // searchable - task_id
-        profileTaskLog.setTaskId((String) data.get(0).getValue());
-        // searchable - instance_id
-        profileTaskLog.setInstanceId((String) data.get(1).getValue());
-        // searchable - operation_type
-        profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
-        return profileTaskLog;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java
deleted file mode 100644
index be815f57c0..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordDeserializer extends AbstractBanyanDBDeserializer<ProfileThreadSnapshotRecord> {
-    public ProfileThreadSnapshotRecordDeserializer() {
-        super(ProfileThreadSnapshotRecord.INDEX_NAME,
-                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
-                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
-                Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
-    }
-
-    @Override
-    public ProfileThreadSnapshotRecord map(RowEntity row) {
-        ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        record.setTaskId((String) searchable.get(0).getValue());
-        record.setSegmentId((String) searchable.get(1).getValue());
-        record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
-        record.setSequence(((Number) searchable.get(3).getValue()).intValue());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
-        return record;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java
deleted file mode 100644
index 2e33ed2373..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-
-@FunctionalInterface
-public interface RowEntityDeserializer<T> {
-    T map(RowEntity row);
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java
deleted file mode 100644
index 0665d614af..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class SegmentRecordDeserializer extends AbstractBanyanDBDeserializer<SegmentRecord> {
-    public SegmentRecordDeserializer() {
-        super(SegmentRecord.INDEX_NAME,
-                ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
-                Collections.singletonList("data_binary"));
-    }
-
-    @Override
-    public SegmentRecord map(RowEntity row) {
-        SegmentRecord record = new SegmentRecord();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        record.setSegmentId(row.getId());
-        record.setTraceId((String) searchable.get(0).getValue());
-        record.setIsError(((Number) searchable.get(1).getValue()).intValue());
-        record.setServiceId((String) searchable.get(2).getValue());
-        record.setServiceInstanceId((String) searchable.get(3).getValue());
-        record.setEndpointId((String) searchable.get(4).getValue());
-        record.setLatency(((Number) searchable.get(5).getValue()).intValue());
-        record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
-        return record;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java
deleted file mode 100644
index cf36fd9239..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-
-import java.util.List;
-
-public class ServiceDeserializer extends AbstractBanyanDBDeserializer<Service> {
-    public ServiceDeserializer() {
-        super(ServiceTraffic.INDEX_NAME,
-                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP));
-    }
-
-    @Override
-    public Service map(RowEntity row) {
-        Service service = new Service();
-        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
-        service.setId(row.getId());
-        service.setName((String) searchable.get(0).getValue());
-        service.setGroup((String) searchable.get(2).getValue());
-        return service;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java
deleted file mode 100644
index a95867c42e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.JsonElement;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.query.enumeration.Language;
-import org.apache.skywalking.oap.server.core.query.type.Attribute;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class ServiceInstanceDeserializer extends AbstractBanyanDBDeserializer<ServiceInstance> {
-    public ServiceInstanceDeserializer() {
-        super(InstanceTraffic.INDEX_NAME,
-                ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
-                Collections.singletonList("data_binary"));
-    }
-
-    @Override
-    public ServiceInstance map(RowEntity row) {
-        InstanceTraffic instanceTraffic = new InstanceTraffic();
-        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        Object o = data.get(0).getValue();
-        ServiceInstance serviceInstance = new ServiceInstance();
-        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-            try {
-                RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
-                instanceTraffic.deserialize(remoteData);
-                serviceInstance.setName(instanceTraffic.getName());
-                serviceInstance.setId(instanceTraffic.getServiceId());
-
-                if (instanceTraffic.getProperties() != null) {
-                    for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
-                        String key = property.getKey();
-                        String value = property.getValue().getAsString();
-                        if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
-                            serviceInstance.setLanguage(Language.value(value));
-                        } else {
-                            serviceInstance.getAttributes().add(new Attribute(key, value));
-                        }
-                    }
-                } else {
-                    serviceInstance.setLanguage(Language.UNKNOWN);
-                }
-            } catch (InvalidProtocolBufferException ex) {
-                throw new RuntimeException("fail to parse remote data", ex);
-            }
-        } else {
-            throw new RuntimeException("unable to parse binary data");
-        }
-
-        return serviceInstance;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
index b154c9280d..a8a2863306 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
 import java.util.ArrayList;
 import java.util.List;
 
-public class AlarmRecordBuilder extends BanyanDBRecordBuilder<AlarmRecord> {
+public class AlarmRecordBuilder extends BanyanDBStorageDataBuilder<AlarmRecord> {
     public static final List<String> INDEXED_TAGS = ImmutableList.of(
             "level"
     );
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java
deleted file mode 100644
index 28c3055f47..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-public abstract class BanyanDBMetricsBuilder<T extends Metrics> extends BanyanDBStorageDataBuilder<T> {
-    @Override
-    protected long timestamp(Model model, T entity) {
-        return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java
deleted file mode 100644
index 0310c7173b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.v1.Banyandb;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class BanyanDBRecordBuilder<T extends Record> extends BanyanDBStorageDataBuilder<T> {
-    @Override
-    protected long timestamp(Model model, T entity) {
-        return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
-    }
-
-    protected List<SerializableTag<Banyandb.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
-        if (rawTags == null) {
-            return Collections.emptyList();
-        }
-        Map<String, SerializableTag<Banyandb.TagValue>> map = new HashMap<>();
-        for (final Tag tag : rawTags) {
-            map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
-        }
-        final List<SerializableTag<Banyandb.TagValue>> tags = new ArrayList<>();
-        for (String indexedTag : indexTags) {
-            SerializableTag<Banyandb.TagValue> tag = map.get(indexedTag);
-            if (tag == null) {
-                tags.add(TagAndValue.nullField());
-            } else {
-                tags.add(tag);
-            }
-        }
-
-        return tags;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
index 6d545f8a9e..03ee0328f3 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
@@ -21,29 +21,65 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
 import org.apache.skywalking.banyandb.v1.Banyandb;
 import org.apache.skywalking.banyandb.v1.client.SerializableTag;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public abstract class BanyanDBStorageDataBuilder<T extends StorageData> {
-    public StreamWrite entity2Storage(Model model, T entity) {
-        return StreamWrite.builder()
+public abstract class BanyanDBStorageDataBuilder<T extends StorageData> implements StorageBuilder<T, StreamWrite.StreamWriteBuilder> {
+    @Override
+    public T storage2Entity(StreamWrite.StreamWriteBuilder storageData) {
+        return null;
+    }
+
+    @Override
+    public StreamWrite.StreamWriteBuilder entity2Storage(T entity) {
+        StreamWrite.StreamWriteBuilder b = StreamWrite.builder()
                 .elementId(this.elementID(entity))
-                .name(model.getName())
-                .timestamp(this.timestamp(model, entity))
                 .searchableTags(this.searchableTags(entity))
-                .dataTags(this.dataTags(entity))
-                .build();
+                .dataTags(this.dataTags(entity));
+        Long ts = this.extractTimestamp(entity);
+        if (ts != null) {
+            b.timestamp(ts);
+        }
+        return b;
+    }
+
+    protected Long extractTimestamp(T entity) {
+        return null;
+    }
+
+    protected List<SerializableTag<Banyandb.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
+        if (rawTags == null) {
+            return Collections.emptyList();
+        }
+        Map<String, SerializableTag<Banyandb.TagValue>> map = new HashMap<>();
+        for (final Tag tag : rawTags) {
+            map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
+        }
+        final List<SerializableTag<Banyandb.TagValue>> tags = new ArrayList<>();
+        for (String indexedTag : indexTags) {
+            SerializableTag<Banyandb.TagValue> tag = map.get(indexedTag);
+            if (tag == null) {
+                tags.add(TagAndValue.nullField());
+            } else {
+                tags.add(tag);
+            }
+        }
+
+        return tags;
     }
 
     protected String elementID(T entity) {
         return entity.id();
     }
 
-    abstract protected long timestamp(Model model, T entity);
-
     abstract protected List<SerializableTag<Banyandb.TagValue>> searchableTags(T entity);
 
     protected List<SerializableTag<Banyandb.TagValue>> dataTags(T entity) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
index 0d1923c6f0..98b9bd0e6b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class BrowserErrorLogRecordBuilder extends BanyanDBRecordBuilder<BrowserErrorLogRecord> {
+public class BrowserErrorLogRecordBuilder extends BanyanDBStorageDataBuilder<BrowserErrorLogRecord> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(BrowserErrorLogRecord entity) {
         List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>();
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
index cce48e8b71..9a99763c73 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
@@ -4,14 +4,12 @@ import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.Banyandb;
 import org.apache.skywalking.banyandb.v1.client.SerializableTag;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class EventBuilder extends BanyanDBMetricsBuilder<Event> {
+public class EventBuilder extends BanyanDBStorageDataBuilder<Event> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(Event entity) {
         List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(8);
@@ -26,11 +24,6 @@ public class EventBuilder extends BanyanDBMetricsBuilder<Event> {
         return searchable;
     }
 
-    @Override
-    protected long timestamp(Model model, Event entity) {
-        return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
-    }
-
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> dataTags(Event entity) {
         return ImmutableList.of(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
index 1d88799fd9..02809d68aa 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import java.util.ArrayList;
 import java.util.List;
 
-public class LogRecordBuilder extends BanyanDBRecordBuilder<LogRecord> {
+public class LogRecordBuilder extends BanyanDBStorageDataBuilder<LogRecord> {
     public static final List<String> INDEXED_TAGS = ImmutableList.of(
             "level"
     );
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
index 1fcd5f1587..66f3218965 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
@@ -30,7 +30,7 @@ import java.util.Collections;
 import java.util.List;
 
 public class Metadata {
-    public static class ServiceTrafficBuilder extends BanyanDBMetricsBuilder<ServiceTraffic> {
+    public static class ServiceTrafficBuilder extends BanyanDBStorageDataBuilder<ServiceTraffic> {
         @Override
         protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ServiceTraffic entity) {
             List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(3);
@@ -41,7 +41,7 @@ public class Metadata {
         }
     }
 
-    public static class EndpointTrafficBuilder extends BanyanDBMetricsBuilder<EndpointTraffic> {
+    public static class EndpointTrafficBuilder extends BanyanDBStorageDataBuilder<EndpointTraffic> {
         @Override
         protected List<SerializableTag<Banyandb.TagValue>> searchableTags(EndpointTraffic entity) {
             List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(2);
@@ -51,7 +51,7 @@ public class Metadata {
         }
     }
 
-    public static class InstanceTrafficBuilder extends BanyanDBMetricsBuilder<InstanceTraffic> {
+    public static class InstanceTrafficBuilder extends BanyanDBStorageDataBuilder<InstanceTraffic> {
         @Override
         protected List<SerializableTag<Banyandb.TagValue>> searchableTags(InstanceTraffic entity) {
             List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(2);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
index 1054bd922f..83bf0abfab 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class NetworkAddressAliasBuilder extends BanyanDBMetricsBuilder<NetworkAddressAlias> {
+public class NetworkAddressAliasBuilder extends BanyanDBStorageDataBuilder<NetworkAddressAlias> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(NetworkAddressAlias entity) {
         return Collections.singletonList(TagAndValue.longField(entity.getLastUpdateTimeBucket()));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
index ee0141dc6d..888cac6b10 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class ProfileTaskLogRecordBuilder extends BanyanDBRecordBuilder<ProfileTaskLogRecord> {
+public class ProfileTaskLogRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskLogRecord> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileTaskLogRecord entity) {
         return Collections.singletonList(TagAndValue.longField(entity.getOperationTime()));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
index 0ce548ffc6..b107cadc31 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import java.util.ArrayList;
 import java.util.List;
 
-public class ProfileTaskRecordBuilder extends BanyanDBRecordBuilder<ProfileTaskRecord> {
+public class ProfileTaskRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskRecord> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileTaskRecord entity) {
         List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(9);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
index 29f6a8b708..10032246ec 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class ProfileThreadSnapshotRecordBuilder extends BanyanDBRecordBuilder<ProfileThreadSnapshotRecord> {
+public class ProfileThreadSnapshotRecordBuilder extends BanyanDBStorageDataBuilder<ProfileThreadSnapshotRecord> {
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileThreadSnapshotRecord entity) {
         List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(4);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
index e93335643c..a87dc9aa86 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
@@ -28,7 +28,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class SegmentRecordBuilder extends BanyanDBRecordBuilder<SegmentRecord> {
+public class SegmentRecordBuilder extends BanyanDBStorageDataBuilder<SegmentRecord> {
     public static final List<String> INDEXED_TAGS = ImmutableList.of(
             "http.method",
             "status_code",
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
index 1776e5af8d..41104ee588 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
@@ -23,19 +23,11 @@ import org.apache.skywalking.banyandb.v1.Banyandb;
 import org.apache.skywalking.banyandb.v1.client.SerializableTag;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class UITemplateBuilder extends BanyanDBStorageDataBuilder<UITemplate> {
-    public static final long UI_TEMPLATE_TIMESTAMP = 1L;
-
-    @Override
-    protected long timestamp(Model model, UITemplate entity) {
-        return UI_TEMPLATE_TIMESTAMP;
-    }
-
     @Override
     protected List<SerializableTag<Banyandb.TagValue>> searchableTags(UITemplate entity) {
         return ImmutableList.of(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 71d45dc7b6..0b89545ee4 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -18,42 +18,62 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AbstractBanyanDBDeserializer;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BanyanDBDeserializerFactory;
 
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.function.Function;
 
 public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
     protected AbstractBanyanDBDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
-    protected <T> List<T> query(Class<T> clazz, QueryBuilder builder) {
-        return this.query(clazz, builder, 0, 0);
+    protected StreamQueryResponse query(String indexName, List<String> searchableTags, QueryBuilder builder) {
+        return this.query(indexName, searchableTags, null, builder);
     }
 
-    protected <T> List<T> query(Class<T> clazz, QueryBuilder builder, long startTimestamp, long endTimestamp) {
-        AbstractBanyanDBDeserializer<T> deserializer = BanyanDBDeserializerFactory.INSTANCE.findDeserializer(clazz);
-
+    protected StreamQueryResponse query(String indexName, List<String> searchableTags, TimestampRange timestampRange,
+                                        QueryBuilder builder) {
         final StreamQuery query;
-        if (startTimestamp != 0 && endTimestamp != 0) {
-            query = deserializer.buildStreamQuery();
+        if (timestampRange == null) {
+            query = new StreamQuery(indexName, searchableTags);
         } else {
-            query = deserializer.buildStreamQuery(startTimestamp, endTimestamp);
+            query = new StreamQuery(indexName, timestampRange, searchableTags);
         }
 
         builder.apply(query);
 
-        final StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(deserializer::map).collect(Collectors.toList());
+        return getClient().query(query);
+    }
+
+    protected abstract static class QueryBuilder {
+        protected static final String SEARCHABLE = "searchable";
+
+        abstract void apply(final StreamQuery query);
+
+        protected PairQueryCondition<Long> eq(String name, long value) {
+            return PairQueryCondition.LongQueryCondition.eq(SEARCHABLE, name, value);
+        }
+
+        protected PairQueryCondition<Long> lte(String name, long value) {
+            return PairQueryCondition.LongQueryCondition.le(SEARCHABLE, name, value);
+        }
+
+        protected PairQueryCondition<Long> gte(String name, long value) {
+            return PairQueryCondition.LongQueryCondition.ge(SEARCHABLE, name, value);
+        }
+
+        protected PairQueryCondition<String> eq(String name, String value) {
+            return PairQueryCondition.StringQueryCondition.eq(SEARCHABLE, name, value);
+        }
     }
 
-    interface QueryBuilder {
-        void apply(final StreamQuery query);
+    interface RowEntityDeserializer<T> extends Function<RowEntity, T> {
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 999d89db79..08225e51e1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,13 +18,22 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.reflect.TypeToken;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
 import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
 import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -33,6 +42,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmReco
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
@@ -45,34 +55,70 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
 
     @Override
     public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
-        List<AlarmMessage> messages = query(AlarmMessage.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                if (Objects.nonNull(scopeId)) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
-                }
-                if (startTB != 0 && endTB != 0) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
-                }
+        TimestampRange tsRange = null;
+        if (startTB > 0 && endTB > 0) {
+            tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
+        }
 
-                // TODO: support keyword search
+        StreamQueryResponse resp = query(AlarmRecord.INDEX_NAME,
+                ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+                tsRange,
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
 
-                if (CollectionUtils.isNotEmpty(tags)) {
-                    for (final Tag tag : tags) {
-                        if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                        if (Objects.nonNull(scopeId)) {
+                            query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId));
                         }
+
+                        // TODO: support keyword search
+
+                        if (CollectionUtils.isNotEmpty(tags)) {
+                            for (final Tag tag : tags) {
+                                if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
+                                    query.appendCondition(eq(tag.getKey(), tag.getValue()));
+                                }
+                            }
+                        }
+                        query.setLimit(limit);
+                        query.setOffset(from);
                     }
-                }
-                query.setLimit(limit);
-                query.setOffset(from);
-            }
-        });
+                });
+
+        List<AlarmMessage> messages = resp.getElements().stream().map(new AlarmMessageDeserializer())
+                .collect(Collectors.toList());
 
         Alarms alarms = new Alarms();
         alarms.setTotal(messages.size());
         alarms.getMsgs().addAll(messages);
         return alarms;
     }
+
+    public static class AlarmMessageDeserializer implements RowEntityDeserializer<AlarmMessage> {
+        @Override
+        public AlarmMessage apply(RowEntity row) {
+            AlarmMessage alarmMessage = new AlarmMessage();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            int scopeID = ((Number) searchable.get(0).getValue()).intValue();
+            alarmMessage.setScopeId(scopeID);
+            alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
+            alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            alarmMessage.setId((String) data.get(0).getValue());
+            alarmMessage.setId1((String) data.get(1).getValue());
+            alarmMessage.setMessage((String) data.get(2).getValue());
+            Object o = data.get(3).getValue();
+            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+                this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+            }
+            return alarmMessage;
+        }
+
+        void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+            List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
+            }.getType());
+            tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 803b05e255..c1bbc16941 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -18,20 +18,29 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
 import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
 import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
@@ -43,36 +52,71 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
 
     @Override
     public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
+        TimestampRange tsRange = null;
+        if (startSecondTB > 0 && endSecondTB > 0) {
+            tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
+        }
 
-        final QueryBuilder qb = new QueryBuilder() {
+        final BrowserErrorLogs logs = new BrowserErrorLogs();
+        StreamQueryResponse resp = query(BrowserErrorLogRecord.INDEX_NAME, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+                BrowserErrorLogRecord.SERVICE_VERSION_ID,
+                BrowserErrorLogRecord.PAGE_PATH_ID,
+                BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
             @Override
             public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+                query.setDataProjections(Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
+                query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId));
 
                 if (StringUtil.isNotEmpty(serviceVersionId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+                    query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
                 }
+
                 if (StringUtil.isNotEmpty(pagePathId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+                    query.appendCondition(eq(BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
                 }
+
                 if (Objects.nonNull(category)) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+                    query.appendCondition(eq(BrowserErrorLogRecord.ERROR_CATEGORY, category.getValue()));
                 }
 
                 query.setOffset(from);
                 query.setLimit(limit);
             }
-        };
-
-        final BrowserErrorLogs logs = new BrowserErrorLogs();
-        final List<BrowserErrorLog> browserErrorLogs;
-        if (startSecondTB != 0 && endSecondTB != 0) {
-            browserErrorLogs = query(BrowserErrorLog.class, qb, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
-        } else {
-            browserErrorLogs = query(BrowserErrorLog.class, qb);
-        }
-        logs.getLogs().addAll(browserErrorLogs);
+        });
+        logs.getLogs().addAll(resp.getElements().stream().map(new BrowserErrorLogDeserializer()).collect(Collectors.toList()));
         logs.setTotal(logs.getLogs().size());
         return logs;
     }
+
+    public static class BrowserErrorLogDeserializer implements RowEntityDeserializer<BrowserErrorLog> {
+        @Override
+        public BrowserErrorLog apply(RowEntity row) {
+            // FIXME: use protobuf directly
+            BrowserErrorLog log = new BrowserErrorLog();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            log.setService((String) searchable.get(0).getValue());
+            log.setServiceVersion((String) searchable.get(1).getValue());
+            log.setPagePath((String) searchable.get(2).getValue());
+            log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
+            log.setTime(row.getTimestamp());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            Object o = data.get(0).getValue();
+            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+                try {
+                    org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+                            .parseFrom((ByteString) o);
+                    log.setGrade(browserErrorLog.getGrade());
+                    log.setCol(browserErrorLog.getCol());
+                    log.setLine(browserErrorLog.getLine());
+                    log.setMessage(browserErrorLog.getMessage());
+                    log.setErrorUrl(browserErrorLog.getErrorUrl());
+                    log.setStack(browserErrorLog.getStack());
+                    log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+                } catch (InvalidProtocolBufferException ex) {
+                    throw new RuntimeException("fail to parse proto buffer", ex);
+                }
+            }
+            return log;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
index b85e07d860..44299c38e9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -19,11 +19,15 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.base.Strings;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.query.PaginationUtils;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.EventType;
 import org.apache.skywalking.oap.server.core.query.type.event.Events;
 import org.apache.skywalking.oap.server.core.query.type.event.Source;
 import org.apache.skywalking.oap.server.core.source.Event;
@@ -31,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
@@ -42,10 +47,13 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
 
     @Override
     public Events queryEvents(EventQueryCondition condition) throws Exception {
-        List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = query(org.apache.skywalking.oap.server.core.query.type.event.Event.class,
+        StreamQueryResponse resp = query(Event.INDEX_NAME,
+                ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, Event.TYPE, Event.START_TIME, Event.END_TIME),
                 new QueryBuilder() {
                     @Override
                     public void apply(StreamQuery query) {
+                        query.setDataProjections(ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
+
                         buildConditions(condition, query);
 
                         PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
@@ -59,8 +67,43 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
                                 query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
                         }
                     }
+
+                    private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
+                        if (!Strings.isNullOrEmpty(condition.getUuid())) {
+                            query.appendCondition(eq(Event.UUID, condition.getUuid()));
+                        }
+                        final Source source = condition.getSource();
+                        if (source != null) {
+                            if (!Strings.isNullOrEmpty(source.getService())) {
+                                query.appendCondition(eq(Event.SERVICE, source.getService()));
+                            }
+                            if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
+                                query.appendCondition(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
+                            }
+                            if (!Strings.isNullOrEmpty(source.getEndpoint())) {
+                                query.appendCondition(eq(Event.ENDPOINT, source.getEndpoint()));
+                            }
+                        }
+                        if (!Strings.isNullOrEmpty(condition.getName())) {
+                            query.appendCondition(eq(Event.NAME, condition.getName()));
+                        }
+                        if (condition.getType() != null) {
+                            query.appendCondition(eq(Event.TYPE, condition.getType().name()));
+                        }
+                        final Duration time = condition.getTime();
+                        if (time != null) {
+                            if (time.getStartTimestamp() > 0) {
+                                query.appendCondition(gte(Event.START_TIME, time.getStartTimestamp()));
+                            }
+                            if (time.getEndTimestamp() > 0) {
+                                query.appendCondition(lte(Event.END_TIME, time.getEndTimestamp()));
+                            }
+                        }
+                    }
                 });
 
+        List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = resp.getElements().stream().map(new EventDeserializer()).collect(Collectors.toList());
+
         Events events = new Events();
         events.setEvents(eventList);
         events.setTotal(eventList.size());
@@ -83,36 +126,23 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
         return events;
     }
 
-    private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
-        if (!Strings.isNullOrEmpty(condition.getUuid())) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.UUID, condition.getUuid()));
-        }
-        final Source source = condition.getSource();
-        if (source != null) {
-            if (!Strings.isNullOrEmpty(source.getService())) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE, source.getService()));
-            }
-            if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE_INSTANCE, source.getServiceInstance()));
-            }
-            if (!Strings.isNullOrEmpty(source.getEndpoint())) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.ENDPOINT, source.getEndpoint()));
-            }
-        }
-        if (!Strings.isNullOrEmpty(condition.getName())) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.NAME, condition.getName()));
-        }
-        if (condition.getType() != null) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.TYPE, condition.getType().name()));
-        }
-        final Duration time = condition.getTime();
-        if (time != null) {
-            if (time.getStartTimestamp() > 0) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.START_TIME, time.getStartTimestamp()));
-            }
-            if (time.getEndTimestamp() > 0) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.END_TIME, time.getEndTimestamp()));
-            }
+    public static class EventDeserializer implements RowEntityDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
+        @Override
+        public org.apache.skywalking.oap.server.core.query.type.event.Event apply(RowEntity row) {
+            final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
+            // searchable
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            resultEvent.setUuid((String) searchable.get(0).getValue());
+            resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
+            resultEvent.setName((String) searchable.get(4).getValue());
+            resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
+            resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+            resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
+            // data
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            resultEvent.setMessage((String) data.get(0).getValue());
+            resultEvent.setParameters((String) data.get(1).getValue());
+            return resultEvent;
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 8325618f88..6bce5767e7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -18,13 +18,23 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.query.type.Log;
 import org.apache.skywalking.oap.server.core.query.type.Logs;
 import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
@@ -36,6 +46,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecord
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
@@ -53,48 +64,84 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
         final QueryBuilder query = new QueryBuilder() {
             @Override
             public void apply(StreamQuery query) {
+                query.setDataProjections(ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
+
                 if (StringUtil.isNotEmpty(serviceId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+                    query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId));
                 }
 
                 if (StringUtil.isNotEmpty(serviceInstanceId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+                    query.appendCondition(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
                 }
                 if (StringUtil.isNotEmpty(endpointId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+                    query.appendCondition(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
                 }
                 if (Objects.nonNull(relatedTrace)) {
                     if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
-                        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+                        query.appendCondition(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
                     }
                     if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
-                        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+                        query.appendCondition(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
                     }
                     if (Objects.nonNull(relatedTrace.getSpanId())) {
-                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+                        query.appendCondition(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
                     }
                 }
 
                 if (CollectionUtils.isNotEmpty(tags)) {
                     for (final Tag tag : tags) {
                         if (LogRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                            query.appendCondition(eq(tag.getKey(), tag.getValue()));
                         }
                     }
                 }
             }
         };
 
-        final List<Log> entities;
-        if (startTB != 0 && endTB != 0) {
-            entities = query(Log.class, query, TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-        } else {
-            entities = query(Log.class, query);
+        TimestampRange tsRange = null;
+        if (startTB > 0 && endTB > 0) {
+            tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
         }
+
+        StreamQueryResponse resp = query(LogRecord.INDEX_NAME,
+                ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+                        AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
+                        AbstractLogRecord.SPAN_ID), tsRange, query);
+
+        List<Log> logEntities = resp.getElements().stream().map(new LogDeserializer()).collect(Collectors.toList());
+
         Logs logs = new Logs();
-        logs.getLogs().addAll(entities);
-        logs.setTotal(entities.size());
+        logs.getLogs().addAll(logEntities);
+        logs.setTotal(logEntities.size());
 
         return logs;
     }
+
+    public static class LogDeserializer implements RowEntityDeserializer<Log> {
+        @Override
+        public Log apply(RowEntity row) {
+            Log log = new Log();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            log.setServiceId((String) searchable.get(0).getValue());
+            log.setServiceInstanceId((String) searchable.get(1).getValue());
+            log.setEndpointId((String) searchable.get(2).getValue());
+            log.setTraceId((String) searchable.get(3).getValue());
+            log.setTimestamp(row.getTimestamp());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
+                log.setContent("");
+            } else {
+                try {
+                    // Don't read the tags as they have been in the data binary already.
+                    LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
+                    for (final KeyStringValuePair pair : logTags.getDataList()) {
+                        log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
+                    }
+                } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return log;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
index aecdd13dfd..7e37288428 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
@@ -18,24 +18,54 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
 import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
 
 import java.io.IOException;
+import java.util.Collections;
 
-@RequiredArgsConstructor
-public class BanyanDBManagementDAO<T extends ManagementData> implements IManagementDAO {
-    private final BanyanDBStorageClient client;
+/**
+ * UITemplate insertion DAO
+ *
+ * @param <T> The only ManagementData we have now is {@link UITemplate}
+ */
+public class BanyanDBManagementDAO<T extends ManagementData> extends AbstractBanyanDBDAO implements IManagementDAO {
     private final BanyanDBStorageDataBuilder<T> storageBuilder;
 
+    public BanyanDBManagementDAO(BanyanDBStorageClient client, BanyanDBStorageDataBuilder<T> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
+
     @Override
     public void insert(Model model, ManagementData storageData) throws IOException {
-        StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) storageData);
-        client.write(streamWrite);
+        // ensure only insert once
+        StreamQueryResponse resp = query(UITemplate.INDEX_NAME,
+                Collections.singletonList(UITemplate.NAME),
+                new TimestampRange(0L, 2L),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(UITemplate.NAME, storageData.id()));
+                    }
+                });
+
+        if (resp != null && resp.getElements().size() > 0) {
+            return;
+        }
+
+        StreamWrite.StreamWriteBuilder streamWrite = this.storageBuilder
+                .entity2Storage((T) storageData)
+                .name(model.getName())
+                .timestamp(1L);
+        getClient().write(streamWrite.build());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
index b3b6fece70..fdc1ccb71f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -18,23 +18,35 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
 import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
 import org.apache.skywalking.oap.server.core.query.type.Database;
 import org.apache.skywalking.oap.server.core.query.type.Endpoint;
 import org.apache.skywalking.oap.server.core.query.type.Service;
 import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -50,83 +62,184 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
 
     @Override
     public List<Service> getAllServices(String group) throws IOException {
-        return query(Service.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                if (StringUtil.isNotEmpty(group)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.GROUP, group));
-                }
-            }
-        });
+        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        if (StringUtil.isNotEmpty(group)) {
+                            query.appendCondition(eq(ServiceTraffic.GROUP, group));
+                        }
+                    }
+                });
+
+        return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
     }
 
     @Override
     public List<Service> getAllBrowserServices() throws IOException {
-        return query(Service.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Browser.value()));
-            }
-        });
+        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(ServiceTraffic.NODE_TYPE, NodeType.Browser.value()));
+                    }
+                });
+
+        return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
     }
 
     @Override
     public List<Database> getAllDatabases() throws IOException {
-        return query(Database.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
-            }
-        });
+        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME, ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
+                    }
+                });
+
+        return resp.getElements().stream().map(new DatabaseDeserializer()).collect(Collectors.toList());
     }
 
     @Override
     public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
-        return query(Service.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
-            }
-        }).stream().filter(s -> s.getName().contains(keyword)) // TODO: support analyzer in database
+        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(ServiceTraffic.NODE_TYPE, nodeType.value()));
+                    }
+                });
+
+        return resp.getElements().stream().map(new ServiceDeserializer())
+                .filter(s -> s.getName().contains(keyword)) // TODO: support analyzer in database
                 .collect(Collectors.toList());
     }
 
     @Override
     public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
-        return query(Service.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
-                // only get one
-                query.setLimit(1);
-            }
-        }).stream().findAny().orElse(null);
+        StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
+                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+                        // only get one
+                        query.setLimit(1);
+                    }
+                });
+
+        return resp.getElements().stream().map(new ServiceDeserializer()).findAny().orElse(null);
     }
 
     @Override
     public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
-        return query(Endpoint.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", EndpointTraffic.SERVICE_ID, serviceId));
-            }
-        }).stream().filter(e -> e.getName().contains(keyword))
+        StreamQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
+                ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID), new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
+                    }
+                });
+
+        return resp.getElements().stream().map(new EndpointDeserializer()).filter(e -> e.getName().contains(keyword))
                 .limit(limit).collect(Collectors.toList());
     }
 
     @Override
     public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
-        return query(ServiceInstance.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
-                final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
-
-                query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
-                query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", InstanceTraffic.SERVICE_ID, serviceId));
+        StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+                ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(Collections.singletonList("data_binary"));
+
+                        final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+                        final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
+
+                        query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
+                        query.appendCondition(lte(InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
+                        query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
+                    }
+                });
+
+        return resp.getElements().stream().map(new ServiceInstanceDeserializer()).collect(Collectors.toList());
+    }
+
+    public static class DatabaseDeserializer implements RowEntityDeserializer<Database> {
+        @Override
+        public Database apply(RowEntity row) {
+            Database database = new Database();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            database.setId(row.getId());
+            database.setName((String) searchable.get(0).getValue());
+            return database;
+        }
+    }
+
+    public static class EndpointDeserializer implements RowEntityDeserializer<Endpoint> {
+        @Override
+        public Endpoint apply(RowEntity row) {
+            Endpoint endpoint = new Endpoint();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            endpoint.setName((String) searchable.get(0).getValue());
+            endpoint.setId((String) searchable.get(1).getValue());
+            return endpoint;
+        }
+    }
+
+    public static class ServiceDeserializer implements RowEntityDeserializer<Service> {
+        @Override
+        public Service apply(RowEntity row) {
+            Service service = new Service();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            service.setId(row.getId());
+            service.setName((String) searchable.get(0).getValue());
+            service.setGroup((String) searchable.get(2).getValue());
+            return service;
+        }
+    }
+
+    public static class ServiceInstanceDeserializer implements RowEntityDeserializer<ServiceInstance> {
+        @Override
+        public ServiceInstance apply(RowEntity row) {
+            InstanceTraffic instanceTraffic = new InstanceTraffic();
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            Object o = data.get(0).getValue();
+            ServiceInstance serviceInstance = new ServiceInstance();
+            if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+                try {
+                    RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
+                    instanceTraffic.deserialize(remoteData);
+                    serviceInstance.setName(instanceTraffic.getName());
+                    serviceInstance.setId(instanceTraffic.getServiceId());
+
+                    if (instanceTraffic.getProperties() != null) {
+                        for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
+                            String key = property.getKey();
+                            String value = property.getValue().getAsString();
+                            if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+                                serviceInstance.setLanguage(Language.value(value));
+                            } else {
+                                serviceInstance.getAttributes().add(new Attribute(key, value));
+                            }
+                        }
+                    } else {
+                        serviceInstance.setLanguage(Language.UNKNOWN);
+                    }
+                } catch (InvalidProtocolBufferException ex) {
+                    throw new RuntimeException("fail to parse remote data", ex);
+                }
+            } else {
+                throw new RuntimeException("unable to parse binary data");
             }
-        });
+
+            return serviceInstance;
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
index 618ff86af9..5aa1b7657d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
@@ -20,12 +20,13 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBMetricsBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -33,7 +34,7 @@ import java.util.List;
 
 @RequiredArgsConstructor
 public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
-    private final BanyanDBMetricsBuilder<T> storageBuilder;
+    private final BanyanDBStorageDataBuilder<T> storageBuilder;
 
     @Override
     public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
@@ -42,12 +43,15 @@ public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
-        StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) metrics);
-        return new BanyanDBStreamInsertRequest(streamWrite);
+        StreamWrite.StreamWriteBuilder builder = this.storageBuilder.entity2Storage((T) metrics)
+                .name(model.getName())
+                .timestamp(TimeBucket.getTimeBucket(metrics.getTimeBucket(), model.getDownsampling()));
+        return new BanyanDBStreamInsertRequest(builder.build());
     }
 
     @Override
     public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
-        return null;
+        return new UpdateRequest() {
+        };
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index ce36573617..33aec37bac 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -18,13 +18,18 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link NetworkAddressAlias} is a stream
@@ -36,11 +41,36 @@ public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implemen
 
     @Override
     public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
-        return query(NetworkAddressAlias.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
-            }
-        });
+        StreamQueryResponse resp = query(NetworkAddressAlias.INDEX_NAME,
+                ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
+                        query.appendCondition(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+                    }
+                });
+
+        return resp.getElements().stream().map(new NetworkAddressAliasDeserializer()).collect(Collectors.toList());
+    }
+
+    public static class NetworkAddressAliasDeserializer implements RowEntityDeserializer<NetworkAddressAlias> {
+        @Override
+        public NetworkAddressAlias apply(RowEntity row) {
+            NetworkAddressAlias model = new NetworkAddressAlias();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            // searchable - last_update_time_bucket
+            model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            // data - time_bucket
+            model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
+            // data - address
+            model.setAddress((String) data.get(1).getValue());
+            // data - represent_service_id
+            model.setRepresentServiceId((String) data.get(2).getValue());
+            // data - represent_service_instance_id
+            model.setRepresentServiceInstanceId((String) data.get(3).getValue());
+            return model;
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
index 30ed279432..e19ff9af98 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
@@ -20,22 +20,32 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
 
 import java.io.IOException;
 
+/**
+ * DAO for NoneStream, specifically ProfileTaskRecord
+ *
+ * @param <T> For NoneStream, we only have {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord}
+ */
 @RequiredArgsConstructor
 public class BanyanDBNoneStreamDAO<T extends NoneStream> implements INoneStreamDAO {
     private final BanyanDBStorageClient client;
-    private final BanyanDBRecordBuilder<T> storageBuilder;
+    private final BanyanDBStorageDataBuilder<T> storageBuilder;
 
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
-        StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) noneStream);
-        this.client.write(streamWrite);
+        final long timestamp = TimeBucket.getTimeBucket(noneStream.getTimeBucket(), model.getDownsampling());
+        StreamWrite.StreamWriteBuilder builder =
+                this.storageBuilder.entity2Storage((T) noneStream)
+                        .name(model.getName())
+                        .timestamp(timestamp);
+        this.client.write(builder.build());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 3a3c12e119..3a4910f3b5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -18,8 +18,14 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
@@ -41,12 +47,38 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
 
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
-        return query(ProfileTaskLog.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
-            }
-        }).stream().sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+        StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
+                ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(ImmutableList.of(ProfileTaskLogRecord.TASK_ID,
+                                ProfileTaskLogRecord.INSTANCE_ID,
+                                ProfileTaskLogRecord.OPERATION_TYPE));
+                        query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
+                    }
+                });
+
+        return resp.getElements().stream().map(new ProfileTaskLogDeserializer())
+                .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
                 .collect(Collectors.toList());
     }
+
+    public static class ProfileTaskLogDeserializer implements RowEntityDeserializer<ProfileTaskLog> {
+        @Override
+        public ProfileTaskLog apply(RowEntity row) {
+            ProfileTaskLog profileTaskLog = new ProfileTaskLog();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            // searchable - operation_time
+            profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            // searchable - task_id
+            profileTaskLog.setTaskId((String) data.get(0).getValue());
+            // searchable - instance_id
+            profileTaskLog.setInstanceId((String) data.get(1).getValue());
+            // searchable - operation_type
+            profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
+            return profileTaskLog;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 47c7c57a00..1cbaea95af 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -18,60 +18,66 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskDeserializer;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
  */
 public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
+    public static final String ID = "profile_task_query_id";
+
     public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
     @Override
     public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
-        return query(ProfileTask.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                if (StringUtil.isNotEmpty(serviceId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
-                            ProfileTaskRecord.SERVICE_ID, serviceId));
-                }
-
-                if (StringUtil.isNotEmpty(endpointName)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
-                            ProfileTaskRecord.ENDPOINT_NAME, endpointName));
-                }
-
-                if (Objects.nonNull(startTimeBucket)) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
-                            ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
-                }
-
-                if (Objects.nonNull(endTimeBucket)) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
-                            ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
-                }
-
-                if (Objects.nonNull(limit)) {
-                    query.setLimit(limit);
-                }
-
-                query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
-            }
-        });
+        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
+                ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT), new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            query.appendCondition(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
+                        }
+
+                        if (StringUtil.isNotEmpty(endpointName)) {
+                            query.appendCondition(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+                        }
+
+                        if (Objects.nonNull(startTimeBucket)) {
+                            query.appendCondition(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+                        }
+
+                        if (Objects.nonNull(endTimeBucket)) {
+                            query.appendCondition(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+                        }
+
+                        if (Objects.nonNull(limit)) {
+                            query.setLimit(limit);
+                        }
+
+                        query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+                    }
+                });
+
+        return resp.getElements().stream().map(new ProfileTaskDeserializer()).collect(Collectors.toList());
     }
 
     @Override
@@ -80,12 +86,36 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
             return null;
         }
 
-        return query(ProfileTask.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskDeserializer.ID, id));
-                query.setLimit(1);
-            }
-        }).stream().findAny().orElse(null);
+        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
+                ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(ID, id));
+                        query.setLimit(1);
+                    }
+                });
+
+        return resp.getElements().stream().map(new ProfileTaskDeserializer()).findAny().orElse(null);
+    }
+
+    public static class ProfileTaskDeserializer implements RowEntityDeserializer<ProfileTask> {
+        @Override
+        public ProfileTask apply(RowEntity row) {
+            ProfileTask profileTask = new ProfileTask();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            profileTask.setId((String) searchable.get(0).getValue());
+            profileTask.setServiceId((String) searchable.get(1).getValue());
+            profileTask.setEndpointName((String) searchable.get(2).getValue());
+            profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
+            profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
+            profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
+            profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
+            profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
+            profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
+            return profileTask;
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index c6b9cdec9f..4da87ac9e5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -18,8 +18,13 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
@@ -27,9 +32,9 @@ import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnaps
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -44,30 +49,37 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
 
     @Override
     public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
-        List<ProfileThreadSnapshotRecord> resp = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
-                        .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
-            }
-        });
-
-        if (resp.isEmpty()) {
+        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.appendCondition(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
+                                .appendCondition(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+                    }
+                });
+
+        if (resp.getElements().isEmpty()) {
             return Collections.emptyList();
         }
 
-        final List<String> segmentIDs = resp.stream().map(ProfileThreadSnapshotRecord::getSegmentId).collect(Collectors.toList());
+        List<String> segmentIDs = resp.getElements().stream()
+                .map(new ProfileThreadSnapshotRecordDeserializer())
+                .map(ProfileThreadSnapshotRecord::getSegmentId)
+                .collect(Collectors.toList());
 
         // TODO: support `IN` or `OR` logic operation in BanyanDB
-        List<BasicTrace> basicTraces = new LinkedList<>();
+        List<BasicTrace> basicTraces = new ArrayList<>();
         for (String segmentID : segmentIDs) {
-            List<BasicTrace> subSet = query(BasicTrace.class, new QueryBuilder() {
-                @Override
-                public void apply(StreamQuery traceQuery) {
-                    traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
-                }
-            });
-            basicTraces.addAll(subSet);
+            final StreamQueryResponse segmentRecordResp = query(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"),
+                    new QueryBuilder() {
+                        @Override
+                        public void apply(StreamQuery traceQuery) {
+                            traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID));
+                        }
+                    });
+            basicTraces.addAll(segmentRecordResp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList()));
         }
 
         // TODO: Sort in DB with DESC
@@ -92,35 +104,54 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
 
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
-        return query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-                        .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
-                        .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
-            }
-        });
+        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
+
+                        query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                                .appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence))
+                                .appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence));
+                    }
+                });
+
+        return resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
     }
 
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
-        return query(SegmentRecord.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
-            }
-        }).stream().findFirst().orElse(null);
+        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+                ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(Collections.singletonList("data_binary"));
+                        query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
+                    }
+                });
+
+        return resp.getElements().stream().map(new SegmentRecordDeserializer()).findFirst().orElse(null);
     }
 
     private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
-        List<ProfileThreadSnapshotRecord> records = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-                        .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
-                        .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
-            }
-        });
+        StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
+
+                        query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                                .appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
+                                .appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
+                    }
+                });
+
+        List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
 
         switch (aggType) {
             case MIN:
@@ -145,4 +176,55 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
     enum AggType {
         MIN, MAX
     }
+
+    public static class ProfileThreadSnapshotRecordDeserializer implements RowEntityDeserializer<ProfileThreadSnapshotRecord> {
+        @Override
+        public ProfileThreadSnapshotRecord apply(RowEntity row) {
+            ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            record.setTaskId((String) searchable.get(0).getValue());
+            record.setSegmentId((String) searchable.get(1).getValue());
+            record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
+            record.setSequence(((Number) searchable.get(3).getValue()).intValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
+            return record;
+        }
+    }
+
+    public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
+        @Override
+        public SegmentRecord apply(RowEntity row) {
+            SegmentRecord record = new SegmentRecord();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            record.setSegmentId(row.getId());
+            record.setTraceId((String) searchable.get(0).getValue());
+            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+            record.setServiceId((String) searchable.get(2).getValue());
+            record.setServiceInstanceId((String) searchable.get(3).getValue());
+            record.setEndpointId((String) searchable.get(4).getValue());
+            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
+            return record;
+        }
+    }
+
+    public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
+        @Override
+        public BasicTrace apply(RowEntity row) {
+            BasicTrace trace = new BasicTrace();
+            trace.setSegmentId(row.getId());
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            trace.getTraceIds().add((String) searchable.get(0).getValue());
+            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                    (String) searchable.get(2).getValue()
+            ).getEndpointName());
+            trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+            trace.setStart(String.valueOf(searchable.get(4).getValue()));
+            return trace;
+        }
+    }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 0bd0e30a82..400731d83e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -20,22 +20,25 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
 
 import java.io.IOException;
 
 @RequiredArgsConstructor
 public class BanyanDBRecordDAO<T extends Record> implements IRecordDAO {
-    private final BanyanDBRecordBuilder<T> storageBuilder;
+    private final BanyanDBStorageDataBuilder<T> storageBuilder;
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
-        StreamWrite streamWrite = storageBuilder.entity2Storage(model, (T) record);
+        StreamWrite.StreamWriteBuilder builder = storageBuilder.entity2Storage((T) record)
+                .name(model.getName())
+                .timestamp(TimeBucket.getTimeBucket(record.getTimeBucket(), model.getDownsampling()));
 
-        return new BanyanDBStreamInsertRequest(streamWrite);
+        return new BanyanDBStreamInsertRequest(builder.build());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 76c0359153..98f31ebd7e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -19,19 +19,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -40,19 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
-
-import java.util.Map;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
 
 @Slf4j
 public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> implements StorageDAO {
@@ -62,79 +41,21 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple
 
     @Override
     public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
-        try {
-            Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
-            if (Event.class.equals(returnType)) {
-                return new BanyanDBMetricsDAO<>(new EventBuilder());
-            } else if (ServiceTraffic.class.equals(returnType)) {
-                return new BanyanDBMetricsDAO<>(new Metadata.ServiceTrafficBuilder());
-            } else if (InstanceTraffic.class.equals(returnType)) {
-                return new BanyanDBMetricsDAO<>(new Metadata.InstanceTrafficBuilder());
-            } else if (EndpointTraffic.class.equals(returnType)) {
-                return new BanyanDBMetricsDAO<>(new Metadata.EndpointTrafficBuilder());
-            } else if (NetworkAddressAlias.class.equals(returnType)) {
-                return new BanyanDBMetricsDAO<>(new NetworkAddressAliasBuilder());
-            } else {
-                throw new IllegalStateException("record type is not supported");
-            }
-        } catch (NoSuchMethodException noSuchMethodException) {
-            log.error("cannot find method storage2Entity", noSuchMethodException);
-            throw new RuntimeException("cannot find method storage2Entity");
-        }
+        return new BanyanDBMetricsDAO<>((BanyanDBStorageDataBuilder<Metrics>) storageBuilder);
     }
 
     @Override
     public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
-        try {
-            Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
-            if (SegmentRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new SegmentRecordBuilder());
-            } else if (AlarmRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new AlarmRecordBuilder());
-            } else if (BrowserErrorLogRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new BrowserErrorLogRecordBuilder());
-            } else if (LogRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new LogRecordBuilder());
-            } else if (ProfileTaskLogRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new ProfileTaskLogRecordBuilder());
-            } else if (ProfileThreadSnapshotRecord.class.equals(returnType)) {
-                return new BanyanDBRecordDAO<>(new ProfileThreadSnapshotRecordBuilder());
-            } else {
-                throw new IllegalStateException("record type is not supported");
-            }
-        } catch (NoSuchMethodException noSuchMethodException) {
-            log.error("cannot find method storage2Entity", noSuchMethodException);
-            throw new RuntimeException("cannot find method storage2Entity");
-        }
+        return new BanyanDBRecordDAO<>((BanyanDBStorageDataBuilder<Record>) storageBuilder);
     }
 
     @Override
     public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
-        try {
-            Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
-            if (ProfileTaskRecord.class.equals(returnType)) {
-                return new BanyanDBNoneStreamDAO<>(getClient(), new ProfileTaskRecordBuilder());
-            } else {
-                throw new IllegalStateException("record type is not supported");
-            }
-        } catch (NoSuchMethodException noSuchMethodException) {
-            log.error("cannot find method storage2Entity", noSuchMethodException);
-            throw new RuntimeException("cannot find method storage2Entity");
-        }
+        return new BanyanDBNoneStreamDAO<>(getClient(), (BanyanDBStorageDataBuilder<NoneStream>) storageBuilder);
     }
 
     @Override
     public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
-        try {
-            Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
-            if (UITemplate.class.equals(returnType)) {
-                return new BanyanDBManagementDAO<>(getClient(), new UITemplateBuilder());
-            } else {
-                throw new IllegalStateException("record type is not supported");
-            }
-        } catch (NoSuchMethodException noSuchMethodException) {
-            log.error("cannot find method storage2Entity", noSuchMethodException);
-            throw new RuntimeException("cannot find method storage2Entity");
-        }
+        return new BanyanDBManagementDAO<>(getClient(), (BanyanDBStorageDataBuilder<ManagementData>) storageBuilder);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 73e80c8f94..c5bb950bc6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,9 +19,15 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
 import lombok.Getter;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
@@ -38,6 +44,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRe
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
     public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
@@ -46,39 +53,39 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
 
     @Override
     public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
-        final QueryBuilder builder = new QueryBuilder() {
+        final QueryBuilder q = new QueryBuilder() {
             @Override
             public void apply(StreamQuery query) {
                 if (minDuration != 0) {
                     // duration >= minDuration
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+                    query.appendCondition(gte("duration", minDuration));
                 }
                 if (maxDuration != 0) {
                     // duration <= maxDuration
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+                    query.appendCondition(lte("duration", maxDuration));
                 }
 
                 if (!Strings.isNullOrEmpty(serviceId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+                    query.appendCondition(eq("service_id", serviceId));
                 }
 
                 if (!Strings.isNullOrEmpty(serviceInstanceId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+                    query.appendCondition(eq("service_instance_id", serviceInstanceId));
                 }
 
                 if (!Strings.isNullOrEmpty(endpointId)) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+                    query.appendCondition(eq("endpoint_id", endpointId));
                 }
 
                 switch (traceState) {
                     case ERROR:
-                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.ERROR.getState()));
+                        query.appendCondition(eq("state", TraceStateStorage.ERROR.getState()));
                         break;
                     case SUCCESS:
-                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.SUCCESS.getState()));
+                        query.appendCondition(eq("state", TraceStateStorage.SUCCESS.getState()));
                         break;
                     default:
-                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.ALL.getState()));
+                        query.appendCondition(eq("state", TraceStateStorage.ALL.getState()));
                         break;
                 }
 
@@ -94,7 +101,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
                 if (CollectionUtils.isNotEmpty(tags)) {
                     for (final Tag tag : tags) {
                         if (SegmentRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
-                            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                            query.appendCondition(eq(tag.getKey(), tag.getValue()));
                         }
                     }
                 }
@@ -104,13 +111,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
             }
         };
 
-        final List<BasicTrace> basicTraces;
-        if (startSecondTB != 0 && endSecondTB != 0) {
-            basicTraces = query(BasicTrace.class, builder, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
-        } else {
-            basicTraces = query(BasicTrace.class, builder);
+        TimestampRange tsRange = null;
+
+        if (startSecondTB > 0 && endSecondTB > 0) {
+            tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
         }
 
+        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+                ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"), tsRange, q);
+
+        List<BasicTrace> basicTraces = resp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList());
+
         TraceBrief brief = new TraceBrief();
         brief.setTotal(basicTraces.size());
         brief.getTraces().addAll(basicTraces);
@@ -119,12 +130,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
 
     @Override
     public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
-        return query(SegmentRecord.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
-            }
-        });
+        StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+                ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(Collections.singletonList("data_binary"));
+                        query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId));
+                    }
+                });
+
+        return resp.getElements().stream().map(new SegmentRecordDeserializer()).collect(Collectors.toList());
     }
 
     @Override
@@ -142,4 +158,40 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
             this.state = state;
         }
     }
+
+    public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
+        @Override
+        public BasicTrace apply(RowEntity row) {
+            BasicTrace trace = new BasicTrace();
+            trace.setSegmentId(row.getId());
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            trace.getTraceIds().add((String) searchable.get(0).getValue());
+            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                    (String) searchable.get(2).getValue()
+            ).getEndpointName());
+            trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+            trace.setStart(String.valueOf(searchable.get(4).getValue()));
+            return trace;
+        }
+    }
+
+    public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
+        @Override
+        public SegmentRecord apply(RowEntity row) {
+            SegmentRecord record = new SegmentRecord();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            record.setSegmentId(row.getId());
+            record.setTraceId((String) searchable.get(0).getValue());
+            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+            record.setServiceId((String) searchable.get(2).getValue());
+            record.setServiceInstanceId((String) searchable.get(3).getValue());
+            record.setEndpointId((String) searchable.get(4).getValue());
+            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
+            return record;
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 9ed0af8447..1d9f977b0a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -18,21 +18,25 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.Tag;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
 import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
 import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
@@ -44,15 +48,19 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
 
     @Override
     public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
-        return query(DashboardConfiguration.class, new QueryBuilder() {
-            @Override
-            public void apply(StreamQuery query) {
-                query.setLimit(10000);
-                if (!includingDisabled) {
-                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
-                }
-            }
-        });
+        StreamQueryResponse resp = query(UITemplate.INDEX_NAME, ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        query.setDataProjections(ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
+                        query.setLimit(10000);
+                        if (!includingDisabled) {
+                            query.appendCondition(eq(UITemplate.DISABLED, BooleanUtils.FALSE));
+                        }
+                    }
+                });
+
+        return resp.getElements().stream().map(new DashboardConfigurationDeserializer()).collect(Collectors.toList());
     }
 
     @Override
@@ -71,7 +79,7 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
                 .dataTag(Tag.stringField(uiTemplate.getConfiguration()))
                 // data - activated
                 .dataTag(Tag.longField(uiTemplate.getActivated()))
-                .timestamp(UITemplateBuilder.UI_TEMPLATE_TIMESTAMP)
+                .timestamp(1L)
                 .elementId(uiTemplate.id())
                 .build();
         getClient().write(request);
@@ -87,4 +95,24 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
     public TemplateChangeStatus disableTemplate(String name) throws IOException {
         return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
     }
+
+    public static class DashboardConfigurationDeserializer implements RowEntityDeserializer<DashboardConfiguration> {
+        @Override
+        public DashboardConfiguration apply(RowEntity row) {
+            DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
+            final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+            // name
+            dashboardConfiguration.setName((String) searchable.get(0).getValue());
+            // disabled
+            dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
+            final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+            // activated
+            dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
+            // configuration
+            dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
+            // type
+            dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
+            return dashboardConfiguration;
+        }
+    }
 }