You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:24 UTC

[skywalking] 05/24: refactor deser

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 7fb9a6ad37e96be4fc4104ae6e022514683a0492
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Dec 2 22:13:57 2021 +0800

    refactor deser
---
 .../deserializer/AbstractBanyanDBDeserializer.java |  35 +++++
 .../banyandb/deserializer/AlarmMessageMapper.java  |  36 ++---
 .../deserializer/BanyanDBDeserializerFactory.java  |  54 +++++++
 .../banyandb/deserializer/BasicTraceMapper.java    |  18 +--
 .../deserializer/BrowserErrorLogMapper.java        |  24 ++-
 .../deserializer/DashboardConfigurationMapper.java |  18 +--
 .../banyandb/deserializer/DatabaseMapper.java      |  25 ++++
 .../banyandb/deserializer/EndpointMapper.java      |  25 ++++
 .../plugin/banyandb/deserializer/EventMapper.java  |  37 +++--
 .../plugin/banyandb/deserializer/LogMapper.java    |  26 +---
 .../deserializer/NetworkAddressAliasMapper.java    |  15 +-
 .../deserializer/ProfileTaskLogMapper.java         |  16 +-
 .../banyandb/deserializer/ProfileTaskMapper.java   |  18 +--
 .../ProfileThreadSnapshotRecordMapper.java         |  16 +-
 .../banyandb/deserializer/RowEntityMapper.java     |   7 +-
 .../banyandb/deserializer/SegmentRecordMapper.java |  18 +--
 .../deserializer/ServiceInstanceMapper.java        |  70 +++++++++
 .../banyandb/deserializer/ServiceMapper.java       |  26 ++++
 .../banyandb/stream/AbstractBanyanDBDAO.java       |  42 ++++++
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |  49 +++----
 .../stream/BanyanDBBrowserLogQueryDAO.java         |  59 ++++----
 .../banyandb/stream/BanyanDBEventQueryDAO.java     |  86 ++++++++++-
 .../banyandb/stream/BanyanDBLogQueryDAO.java       |  71 ++++-----
 .../banyandb/stream/BanyanDBMetadataQueryDAO.java  |  77 ++++++++--
 .../stream/BanyanDBNetworkAddressAliasDAO.java     |  20 +--
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  19 +--
 .../stream/BanyanDBProfileTaskQueryDAO.java        |  69 +++++----
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |  77 +++++-----
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |   6 +-
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     | 161 +++++++++------------
 .../stream/BanyanDBUITemplateManagementDAO.java    |  26 ++--
 31 files changed, 782 insertions(+), 464 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
new file mode 100644
index 0000000000..44adcb8869
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
@@ -0,0 +1,35 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractBanyanDBDeserializer<T> implements RowEntityMapper<T> {
+    private final String indexName;
+    private final List<String> searchableProjection;
+    private final List<String> dataProjection;
+
+    protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection) {
+        this(indexName, searchableProjection, Collections.emptyList());
+    }
+
+    protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection, List<String> dataProjection) {
+        this.indexName = indexName;
+        this.searchableProjection = searchableProjection;
+        this.dataProjection = dataProjection;
+    }
+
+    public StreamQuery buildStreamQuery() {
+        final StreamQuery query = new StreamQuery(this.indexName, this.searchableProjection);
+        query.setDataProjections(this.dataProjection);
+        return query;
+    }
+
+    public StreamQuery buildStreamQuery(long startTimestamp, long endTimestamp) {
+        final StreamQuery query = new StreamQuery(this.indexName, new TimestampRange(startTimestamp, endTimestamp), this.searchableProjection);
+        query.setDataProjections(this.dataProjection);
+        return query;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
index d906dbb78f..939496e061 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
@@ -1,33 +1,27 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.ByteString;
-import lombok.RequiredArgsConstructor;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
 import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 
 import java.util.List;
 
-@RequiredArgsConstructor
-public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
-    private final IAlarmQueryDAO alarmQueryDAO;
+public class AlarmMessageMapper extends AbstractBanyanDBDeserializer<AlarmMessage> {
+    private final Gson GSON = new Gson();
 
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(AlarmRecord.SCOPE, // 0
-                AlarmRecord.START_TIME); // 1
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return ImmutableList.of(AlarmRecord.ID0, // 0
-                AlarmRecord.ID1, // 1
-                AlarmRecord.ALARM_MESSAGE, // 2
-                AlarmRecord.TAGS_RAW_DATA); // 3
+    public AlarmMessageMapper() {
+        super(AlarmRecord.INDEX_NAME,
+                ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+                ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
     }
 
     @Override
@@ -44,8 +38,14 @@ public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
         alarmMessage.setMessage((String) data.get(2).getValue());
         Object o = data.get(3).getValue();
         if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
-            this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+            this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
         }
         return alarmMessage;
     }
+
+    void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+        List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
+        }.getType());
+        tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
new file mode 100644
index 0000000000..dd4af41736
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
@@ -0,0 +1,54 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Log;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.query.type.event.Event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum BanyanDBDeserializerFactory {
+    INSTANCE;
+
+    private final Map<Class<?>, AbstractBanyanDBDeserializer<?>> registry;
+
+    BanyanDBDeserializerFactory() {
+        registry = new HashMap<>(10);
+        register(AlarmMessage.class, new AlarmMessageMapper());
+        register(BasicTrace.class, new BasicTraceMapper());
+        register(BrowserErrorLog.class, new BrowserErrorLogMapper());
+        register(DashboardConfiguration.class, new DashboardConfigurationMapper());
+        register(Database.class, new DatabaseMapper());
+        register(Endpoint.class, new EndpointMapper());
+        register(Event.class, new EventMapper());
+        register(Log.class, new LogMapper());
+        register(NetworkAddressAlias.class, new NetworkAddressAliasMapper());
+        register(ProfileTaskLog.class, new ProfileTaskLogMapper());
+        register(ProfileTask.class, new ProfileTaskMapper());
+        register(ProfileThreadSnapshotRecord.class, new ProfileThreadSnapshotRecordMapper());
+        register(SegmentRecord.class, new SegmentRecordMapper());
+        register(ServiceInstance.class, new ServiceInstanceMapper());
+        register(Service.class, new ServiceMapper());
+    }
+
+    private <T> void register(Class<T> clazz, AbstractBanyanDBDeserializer<T> mapper) {
+        this.registry.put(clazz, mapper);
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public <T> AbstractBanyanDBDeserializer<T> findDeserializer(Class<T> clazz) {
+        return (AbstractBanyanDBDeserializer<T>) registry.get(clazz);
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
index bbcf7091d0..5d7ae36590 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
@@ -4,12 +4,16 @@ import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
 
-import java.util.Collections;
 import java.util.List;
 
-public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
+public class BasicTraceMapper extends AbstractBanyanDBDeserializer<BasicTrace> {
+    public BasicTraceMapper() {
+        super(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"));
+    }
+
     @Override
     public BasicTrace map(RowEntity row) {
         BasicTrace trace = new BasicTrace();
@@ -24,14 +28,4 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
         trace.setStart(String.valueOf(searchable.get(4).getValue()));
         return trace;
     }
-
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.emptyList();
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
index 0807955376..0189a77049 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
@@ -9,22 +9,18 @@ import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErro
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
 import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
 
+import java.util.Collections;
 import java.util.List;
 
-public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
-                BrowserErrorLogRecord.SERVICE_VERSION_ID,
-                BrowserErrorLogRecord.PAGE_PATH_ID,
-                BrowserErrorLogRecord.ERROR_CATEGORY,
-                BrowserErrorLogRecord.TIMESTAMP
-        );
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY);
+public class BrowserErrorLogMapper extends AbstractBanyanDBDeserializer<BrowserErrorLog> {
+    public BrowserErrorLogMapper() {
+        super(BrowserErrorLogRecord.INDEX_NAME,
+                ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+                        BrowserErrorLogRecord.SERVICE_VERSION_ID,
+                        BrowserErrorLogRecord.PAGE_PATH_ID,
+                        BrowserErrorLogRecord.ERROR_CATEGORY,
+                        BrowserErrorLogRecord.TIMESTAMP),
+                Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
index 6b9e0962e6..cb6b4f3c2f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
@@ -10,7 +10,13 @@ import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 
 import java.util.List;
 
-public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> {
+public class DashboardConfigurationMapper extends AbstractBanyanDBDeserializer<DashboardConfiguration> {
+    public DashboardConfigurationMapper() {
+        super(UITemplate.INDEX_NAME,
+                ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
+                ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
+    }
+
     @Override
     public DashboardConfiguration map(RowEntity row) {
         DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
@@ -28,14 +34,4 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo
         dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
         return dashboardConfiguration;
     }
-
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE);
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java
new file mode 100644
index 0000000000..1c5a15f774
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+
+import java.util.List;
+
+public class DatabaseMapper extends AbstractBanyanDBDeserializer<Database> {
+    public DatabaseMapper() {
+        super(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE));
+    }
+
+    @Override
+    public Database map(RowEntity row) {
+        Database database = new Database();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        database.setId(row.getId());
+        database.setName((String) searchable.get(0).getValue());
+        return database;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java
new file mode 100644
index 0000000000..8f81df039c
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+
+import java.util.List;
+
+public class EndpointMapper extends AbstractBanyanDBDeserializer<Endpoint> {
+    public EndpointMapper() {
+        super(EndpointTraffic.INDEX_NAME,
+                ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID));
+    }
+
+    @Override
+    public Endpoint map(RowEntity row) {
+        Endpoint endpoint = new Endpoint();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        endpoint.setName((String) searchable.get(0).getValue());
+        endpoint.setId((String) searchable.get(1).getValue());
+        return endpoint;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
index 1c94903cbd..63c9dc802c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
@@ -2,23 +2,36 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.query.type.event.Event;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.query.type.event.EventType;
+import org.apache.skywalking.oap.server.core.query.type.event.Source;
+import org.apache.skywalking.oap.server.core.source.Event;
 
 import java.util.List;
 
-public class EventMapper implements RowEntityMapper<Event> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of();
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return null;
+public class EventMapper extends AbstractBanyanDBDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
+    public EventMapper() {
+        super(Event.INDEX_NAME,
+                ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME,
+                        Event.TYPE, Event.START_TIME, Event.END_TIME),
+                ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
     }
 
     @Override
-    public Event map(RowEntity row) {
-        return null;
+    public org.apache.skywalking.oap.server.core.query.type.event.Event map(RowEntity row) {
+        final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
+        // searchable
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        resultEvent.setUuid((String) searchable.get(0).getValue());
+        resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
+        resultEvent.setName((String) searchable.get(4).getValue());
+        resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
+        resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+        resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
+        // data
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        resultEvent.setMessage((String) data.get(0).getValue());
+        resultEvent.setParameters((String) data.get(1).getValue());
+        return resultEvent;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
index f1c5aaf442..3f65a82cea 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
@@ -8,29 +8,19 @@ import org.apache.skywalking.apm.network.logging.v3.LogTags;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.query.type.Log;
 
 import java.util.List;
 
-public class LogMapper implements RowEntityMapper<Log> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(
-                AbstractLogRecord.SERVICE_ID, // 0
-                AbstractLogRecord.SERVICE_INSTANCE_ID, // 1
-                AbstractLogRecord.ENDPOINT_ID, // 2
-                AbstractLogRecord.TRACE_ID, // 3
-                AbstractLogRecord.TRACE_SEGMENT_ID,
-                AbstractLogRecord.SPAN_ID,
-                AbstractLogRecord.TIMESTAMP); // 6
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE,
-                AbstractLogRecord.CONTENT,
-                AbstractLogRecord.TAGS_RAW_DATA); // 2
+public class LogMapper extends AbstractBanyanDBDeserializer<Log> {
+    public LogMapper() {
+        super(LogRecord.INDEX_NAME, ImmutableList.of(
+                        AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+                        AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
+                        AbstractLogRecord.SPAN_ID, AbstractLogRecord.TIMESTAMP),
+                ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
index aeb823c752..7c231df86a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
@@ -8,16 +8,11 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 
 import java.util.List;
 
-public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        // TODO: make these static fields public
-        return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id");
+public class NetworkAddressAliasMapper extends AbstractBanyanDBDeserializer<NetworkAddressAlias> {
+    public NetworkAddressAliasMapper() {
+        super(NetworkAddressAlias.INDEX_NAME,
+                ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
+                ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
index bc157253f0..de51d2f95c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
@@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationT
 
 import java.util.List;
 
-public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
-                ProfileTaskLogRecord.OPERATION_TYPE);
+public class ProfileTaskLogMapper extends AbstractBanyanDBDeserializer<ProfileTaskLog> {
+    public ProfileTaskLogMapper() {
+        super(ProfileTaskLogRecord.INDEX_NAME,
+                ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
+                ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
+                        ProfileTaskLogRecord.OPERATION_TYPE));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
index 8a49404d23..2626a91c7d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
@@ -6,22 +6,16 @@ import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
 
-import java.util.Collections;
 import java.util.List;
 
-public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+public class ProfileTaskMapper extends AbstractBanyanDBDeserializer<ProfileTask> {
     public static final String ID = "profile_task_query_id";
 
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
-                ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
-                ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.emptyList();
+    public ProfileTaskMapper() {
+        super(ProfileTaskRecord.INDEX_NAME,
+                ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+                        ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                        ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
index 99f23117f2..2d966a5955 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
@@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord
 import java.util.Collections;
 import java.util.List;
 
-public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
-                ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY);
+public class ProfileThreadSnapshotRecordMapper extends AbstractBanyanDBDeserializer<ProfileThreadSnapshotRecord> {
+    public ProfileThreadSnapshotRecordMapper() {
+        super(ProfileThreadSnapshotRecord.INDEX_NAME,
+                ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+                Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
index cc1d48d94f..51f9a5d687 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
@@ -2,12 +2,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 
-import java.util.List;
-
+@FunctionalInterface
 public interface RowEntityMapper<T> {
-    List<String> searchableProjection();
-
-    List<String> dataProjection();
-
     T map(RowEntity row);
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
index 70318136f6..da7be9de35 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
@@ -9,7 +9,13 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
 import java.util.Collections;
 import java.util.List;
 
-public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
+public class SegmentRecordMapper extends AbstractBanyanDBDeserializer<SegmentRecord> {
+    public SegmentRecordMapper() {
+        super(SegmentRecord.INDEX_NAME,
+                ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+                Collections.singletonList("data_binary"));
+    }
+
     @Override
     public SegmentRecord map(RowEntity row) {
         SegmentRecord record = new SegmentRecord();
@@ -26,14 +32,4 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
         record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
         return record;
     }
-
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.singletonList("data_binary");
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java
new file mode 100644
index 0000000000..f6925d3fd3
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java
@@ -0,0 +1,70 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ServiceInstanceMapper extends AbstractBanyanDBDeserializer<ServiceInstance> {
+    private static final Gson GSON = new Gson();
+
+    public ServiceInstanceMapper() {
+        super(InstanceTraffic.INDEX_NAME,
+                ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
+                Collections.singletonList("data_binary"));
+    }
+
+    @Override
+    public ServiceInstance map(RowEntity row) {
+        ServiceInstance serviceInstance = new ServiceInstance();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        serviceInstance.setId((String) searchable.get(0).getValue());
+        serviceInstance.setInstanceUUID((String) searchable.get(0).getValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        Object o = data.get(0).getValue();
+        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+            try {
+                RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
+                serviceInstance.setName(remoteData.getDataStrings(1));
+                final String propString = remoteData.getDataStrings(2);
+                if (StringUtil.isNotEmpty(propString)) {
+                    JsonObject properties = GSON.fromJson(propString, JsonObject.class);
+                    if (properties != null) {
+                        for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
+                            String key = property.getKey();
+                            String value = property.getValue().getAsString();
+                            if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+                                serviceInstance.setLanguage(Language.value(value));
+                            } else {
+                                serviceInstance.getAttributes().add(new Attribute(key, value));
+                            }
+                        }
+                    } else {
+                        serviceInstance.setLanguage(Language.UNKNOWN);
+                    }
+                } else {
+                    serviceInstance.setLanguage(Language.UNKNOWN);
+                }
+            } catch (InvalidProtocolBufferException ex) {
+                throw new RuntimeException("fail to parse remote data", ex);
+            }
+        } else {
+            serviceInstance.setLanguage(Language.UNKNOWN);
+        }
+        return serviceInstance;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java
new file mode 100644
index 0000000000..e006a1107d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java
@@ -0,0 +1,26 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+
+import java.util.List;
+
+public class ServiceMapper extends AbstractBanyanDBDeserializer<Service> {
+    public ServiceMapper() {
+        super(ServiceTraffic.INDEX_NAME,
+                ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP));
+    }
+
+    @Override
+    public Service map(RowEntity row) {
+        Service service = new Service();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        service.setId(row.getId());
+        service.setName((String) searchable.get(0).getValue());
+        service.setGroup((String) searchable.get(2).getValue());
+        return service;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
new file mode 100644
index 0000000000..b11cca6ed4
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -0,0 +1,42 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AbstractBanyanDBDeserializer;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BanyanDBDeserializerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
+    protected AbstractBanyanDBDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
+    protected <T> List<T> query(Class<T> clazz, QueryBuilder builder) {
+        return this.query(clazz, builder, 0, 0);
+    }
+
+    protected <T> List<T> query(Class<T> clazz, QueryBuilder builder, long startTimestamp, long endTimestamp) {
+        AbstractBanyanDBDeserializer<T> deserializer = BanyanDBDeserializerFactory.INSTANCE.findDeserializer(clazz);
+
+        final StreamQuery query;
+        if (startTimestamp != 0 && endTimestamp != 0) {
+            query = deserializer.buildStreamQuery();
+        } else {
+            query = deserializer.buildStreamQuery(startTimestamp, endTimestamp);
+        }
+
+        builder.apply(query);
+
+        final StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(deserializer::map).collect(Collectors.toList());
+    }
+
+
+    interface QueryBuilder {
+        void apply(final StreamQuery query);
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 7e122611bf..57f658a380 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -2,57 +2,48 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
 import org.apache.skywalking.oap.server.core.query.type.Alarms;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
  * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
  */
-public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO {
-    private final RowEntityMapper<AlarmMessage> mapper;
-
+public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
     public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
         super(client);
-        mapper = new AlarmMessageMapper(this);
     }
 
     @Override
     public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
-        final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection());
-
-        if (Objects.nonNull(scopeId)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
-        }
-        if (startTB != 0 && endTB != 0) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
-        }
-
-        // TODO: support keyword search
-
-        // TODO: support tag search
-
-        query.setLimit(limit);
-        query.setOffset(from);
-
-        StreamQueryResponse resp = getClient().query(query);
-
-        List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList());
+        List<AlarmMessage> messages = query(AlarmMessage.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                if (Objects.nonNull(scopeId)) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
+                }
+                if (startTB != 0 && endTB != 0) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
+                }
+
+                // TODO: support keyword search
+
+                // TODO: support tag search
+
+                query.setLimit(limit);
+                query.setOffset(from);
+            }
+        });
 
         Alarms alarms = new Alarms();
         alarms.setTotal(messages.size());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 25f2c27f4b..3bd3af8ad5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -2,61 +2,54 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
 import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
  */
-public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO {
-    private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper();
-
+public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
     public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
     @Override
     public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
-        final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.setDataProjections(MAPPER.dataProjection());
-
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
-
-        if (startSecondTB != 0 && endSecondTB != 0) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
-        }
-        if (StringUtil.isNotEmpty(serviceVersionId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
-        }
-        if (StringUtil.isNotEmpty(pagePathId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
-        }
-        if (Objects.nonNull(category)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
-        }
-
-        query.setOffset(from);
-        query.setLimit(limit);
-
-        final StreamQueryResponse resp = getClient().query(query);
-
         final BrowserErrorLogs logs = new BrowserErrorLogs();
-        logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()));
+        List<BrowserErrorLog> browserErrorLogs = query(BrowserErrorLog.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+
+                if (startSecondTB != 0 && endSecondTB != 0) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
+                }
+                if (StringUtil.isNotEmpty(serviceVersionId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+                }
+                if (StringUtil.isNotEmpty(pagePathId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+                }
+                if (Objects.nonNull(category)) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+                }
+
+                query.setOffset(from);
+                query.setLimit(limit);
+            }
+        });
+        logs.getLogs().addAll(browserErrorLogs);
         logs.setTotal(logs.getLogs().size());
         return logs;
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
index 5ab67d95c6..9b2f0d2be5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -1,23 +1,101 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import com.google.common.base.Strings;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.oap.server.core.query.PaginationUtils;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
 import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.query.type.event.Source;
+import org.apache.skywalking.oap.server.core.source.Event;
 import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.util.List;
 
 /**
- * ???
  * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
  */
-public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO {
+    public BanyanDBEventQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public Events queryEvents(EventQueryCondition condition) throws Exception {
-        return new Events();
+        List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = query(org.apache.skywalking.oap.server.core.query.type.event.Event.class,
+                new QueryBuilder() {
+                    @Override
+                    public void apply(StreamQuery query) {
+                        buildConditions(condition, query);
+
+                        PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
+                        query.setLimit(page.getLimit());
+                        query.setOffset(page.getFrom());
+                        switch (condition.getOrder()) {
+                            case ASC:
+                                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.ASC));
+                                break;
+                            case DES:
+                                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+                        }
+                    }
+                });
+
+        Events events = new Events();
+        events.setEvents(eventList);
+        // TODO: how to set total???
+        events.setTotal(eventList.size());
+        return events;
     }
 
     @Override
     public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
-        return new Events();
+        Events events = new Events();
+        for (final EventQueryCondition condition : conditionList) {
+            Events subEvents = this.queryEvents(condition);
+            if (subEvents.getEvents().size() == 0) {
+                continue;
+            }
+
+            events.getEvents().addAll(subEvents.getEvents());
+            events.setTotal(events.getTotal() + subEvents.getTotal());
+        }
+
+        return events;
+    }
+
+    private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
+        if (!Strings.isNullOrEmpty(condition.getUuid())) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.UUID, condition.getUuid()));
+        }
+        final Source source = condition.getSource();
+        if (source != null) {
+            if (!Strings.isNullOrEmpty(source.getService())) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE, source.getService()));
+            }
+            if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE_INSTANCE, source.getServiceInstance()));
+            }
+            if (!Strings.isNullOrEmpty(source.getEndpoint())) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.ENDPOINT, source.getEndpoint()));
+            }
+        }
+        if (!Strings.isNullOrEmpty(condition.getName())) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.NAME, condition.getName()));
+        }
+        if (condition.getType() != null) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.TYPE, condition.getType().name()));
+        }
+        final Duration time = condition.getTime();
+        if (time != null) {
+            if (time.getStartTimestamp() > 0) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.START_TIME, time.getStartTimestamp()));
+            }
+            if (time.getEndTimestamp() > 0) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.END_TIME, time.getEndTimestamp()));
+            }
+        }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 661f65b930..404d60017c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -2,33 +2,25 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
 import org.apache.skywalking.oap.server.core.query.type.Log;
 import org.apache.skywalking.oap.server.core.query.type.Logs;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
  */
-public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO {
-    private static final RowEntityMapper<Log> MAPPER = new LogMapper();
-
+public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
     public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -38,45 +30,44 @@ public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> impl
                           TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
                           long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent,
                           List<String> excludingKeywordsOfContent) throws IOException {
-        final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection());
-        if (StringUtil.isNotEmpty(serviceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
-        }
+        List<Log> entities = query(Log.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                if (StringUtil.isNotEmpty(serviceId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+                }
 
-        if (startTB != 0 && endTB != 0) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
-        }
+                if (startTB != 0 && endTB != 0) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
+                }
 
-        if (StringUtil.isNotEmpty(serviceInstanceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
-        }
-        if (StringUtil.isNotEmpty(endpointId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
-        }
-        if (Objects.nonNull(relatedTrace)) {
-            if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
-            }
-            if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
-                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
-            }
-            if (Objects.nonNull(relatedTrace.getSpanId())) {
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
-            }
-        }
+                if (StringUtil.isNotEmpty(serviceInstanceId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+                }
+                if (StringUtil.isNotEmpty(endpointId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+                }
+                if (Objects.nonNull(relatedTrace)) {
+                    if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
+                        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+                    }
+                    if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
+                        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+                    }
+                    if (Objects.nonNull(relatedTrace.getSpanId())) {
+                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+                    }
+                }
 
-        // TODO: if we allow to index tags?
+                // TODO: if we allow to index tags?
 //        if (CollectionUtils.isNotEmpty(tags)) {
 //            for (final Tag tag : tags) {
 //                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
 //            }
 //        }
-
-        StreamQueryResponse resp = getClient().query(query);
-
-        List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
-
+            }
+        });
         Logs logs = new Logs();
         logs.getLogs().addAll(entities);
         logs.setTotal(entities.size());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
index 678b981f16..dd3d9623a6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -1,15 +1,23 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
 import org.apache.skywalking.oap.server.core.query.type.Database;
 import org.apache.skywalking.oap.server.core.query.type.Endpoint;
 import org.apache.skywalking.oap.server.core.query.type.Service;
 import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
@@ -17,39 +25,90 @@ import java.util.List;
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
  * are all streams.
  */
-public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO {
+    public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public List<Service> getAllServices(String group) throws IOException {
-        return Collections.emptyList();
+        return query(Service.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                if (StringUtil.isNotEmpty(group)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.GROUP, group));
+                }
+            }
+        });
     }
 
     @Override
     public List<Service> getAllBrowserServices() throws IOException {
-        return Collections.emptyList();
+        return query(Service.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Browser.value()));
+            }
+        });
     }
 
     @Override
     public List<Database> getAllDatabases() throws IOException {
-        return Collections.emptyList();
+        return query(Database.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
+            }
+        });
     }
 
     @Override
     public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
-        return null;
+        return query(Service.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+            }
+        }).stream().filter((s) -> s.getName().contains(keyword)) // TODO: support analyzer in database
+                .collect(Collectors.toList());
     }
 
     @Override
     public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
-        return null;
+        return query(Service.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+                // only get one
+                query.setLimit(1);
+            }
+        }).stream().findAny().orElse(null);
     }
 
     @Override
     public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
-        return Collections.emptyList();
+        return query(Endpoint.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", EndpointTraffic.SERVICE_ID, serviceId));
+            }
+        }).stream().filter((e) -> e.getName().contains(keyword))
+                .limit(limit).collect(Collectors.toList());
     }
 
     @Override
     public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
-        return Collections.emptyList();
+        return query(ServiceInstance.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+                final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
+
+                query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
+                query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", InstanceTraffic.SERVICE_ID, serviceId));
+            }
+        });
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index 337c288e96..6112593a55 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -2,33 +2,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * {@link NetworkAddressAlias} is a stream
  */
-public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO {
-    private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper();
-
+public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO {
     public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
     @Override
     public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
-        StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
-
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+        return query(NetworkAddressAlias.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+            }
+        });
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 6c19e2f701..0095965109 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -1,10 +1,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper;
@@ -18,7 +15,7 @@ import java.util.stream.Collectors;
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
  */
-public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO {
+public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
     private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper();
 
     private final int queryMaxSize;
@@ -30,14 +27,12 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageC
 
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
-        final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.setDataProjections(MAPPER.dataProjection());
-        query.setLimit(this.queryMaxSize);
-
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream()
-                .map(MAPPER::map)
-                .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+        return query(ProfileTaskLog.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
+            }
+        }).stream().sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
                 .collect(Collectors.toList());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 67f28ce62f..23fb8c7d1b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -16,12 +14,11 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
  */
-public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO {
+public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
     private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper();
 
     public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
@@ -30,37 +27,36 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
 
     @Override
     public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
-        final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.setDataProjections(MAPPER.dataProjection());
+        return query(ProfileTask.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                if (StringUtil.isNotEmpty(serviceId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+                            ProfileTaskRecord.SERVICE_ID, serviceId));
+                }
 
-        if (StringUtil.isNotEmpty(serviceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
-                    ProfileTaskRecord.SERVICE_ID, serviceId));
-        }
-
-        if (StringUtil.isNotEmpty(endpointName)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
-                    ProfileTaskRecord.ENDPOINT_NAME, endpointName));
-        }
-
-        if (Objects.nonNull(startTimeBucket)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
-                    ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
-        }
+                if (StringUtil.isNotEmpty(endpointName)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+                            ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+                }
 
-        if (Objects.nonNull(endTimeBucket)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
-                    ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
-        }
+                if (Objects.nonNull(startTimeBucket)) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
+                            ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+                }
 
-        if (Objects.nonNull(limit)) {
-            query.setLimit(limit);
-        }
+                if (Objects.nonNull(endTimeBucket)) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
+                            ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+                }
 
-        query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+                if (Objects.nonNull(limit)) {
+                    query.setLimit(limit);
+                }
 
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+                query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+            }
+        });
     }
 
     @Override
@@ -69,11 +65,12 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
             return null;
         }
 
-        final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
-        query.setLimit(1);
-
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null);
+        return query(ProfileTask.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
+                query.setLimit(1);
+            }
+        }).stream().findAny().orElse(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index cb5067e890..e92a0b24ff 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
@@ -15,7 +13,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
@@ -26,7 +23,7 @@ import java.util.stream.Collectors;
 /**
  * {@link ProfileThreadSnapshotRecord} is a stream
  */
-public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO {
+public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO implements IProfileThreadSnapshotQueryDAO {
     private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper();
     private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
     private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
@@ -37,24 +34,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS
 
     @Override
     public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
-        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
-                .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
-        StreamQueryResponse resp = getClient().query(query);
-
-        final List<String> segmentIDs = new ArrayList<>(resp.size());
-        resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId()));
-        if (segmentIDs.isEmpty()) {
+        List<ProfileThreadSnapshotRecord> resp = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
+                        .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+            }
+        });
+
+        if (resp.isEmpty()) {
             return Collections.emptyList();
         }
 
+        final List<String> segmentIDs = resp.stream().map(ProfileThreadSnapshotRecord::getSegmentId).collect(Collectors.toList());
+
         // TODO: support `IN` or `OR` logic operation in BanyanDB
         List<BasicTrace> basicTraces = new LinkedList<>();
         for (String segmentID : segmentIDs) {
-            final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection());
-            traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
-            StreamQueryResponse traceResponse = getClient().query(traceQuery);
-            basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+            List<BasicTrace> subSet = query(BasicTrace.class, new QueryBuilder() {
+                @Override
+                public void apply(StreamQuery traceQuery) {
+                    traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
+                }
+            });
+            basicTraces.addAll(subSet);
         }
 
         // TODO: Sort in DB with DESC
@@ -79,33 +82,35 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS
 
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
-        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-                .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
-                .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
-        query.setDataProjections(MAPPER.dataProjection());
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+        return query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                        .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
+                        .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
+            }
+        });
     }
 
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
-        final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
-        query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
-        StreamQueryResponse resp = getClient().query(query);
-        return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null);
+        return query(SegmentRecord.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
+            }
+        }).stream().findFirst().orElse(null);
     }
 
     private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
-        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-                .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
-                .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
-        query.setDataProjections(MAPPER.dataProjection());
-
-        StreamQueryResponse resp = getClient().query(query);
-        List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+        List<ProfileThreadSnapshotRecord> records = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                        .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
+                        .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
+            }
+        });
 
         switch (aggType) {
             case MIN:
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index f5ee5fb255..be81608a37 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -21,7 +21,11 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 838e8a1eb3..bbfd4e3150 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -21,12 +21,14 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 import com.google.common.base.Strings;
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.*;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
+import org.apache.skywalking.oap.server.core.query.type.Span;
+import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
+import org.apache.skywalking.oap.server.core.query.type.TraceState;
 import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
@@ -34,16 +36,14 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageC
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
-import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
 
-public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
     private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
     private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
 
@@ -55,110 +55,89 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
 
     @Override
     public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
-        StreamQuery query;
-        if (startSecondTB != 0 && endSecondTB != 0) {
-            query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB),
-                    parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection());
-        } else {
-            query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection());
-        }
-        if (minDuration != 0) {
-            // duration >= minDuration
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
-        }
-        if (maxDuration != 0) {
-            // duration <= maxDuration
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
-        }
+        final QueryBuilder builder = new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                if (minDuration != 0) {
+                    // duration >= minDuration
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+                }
+                if (maxDuration != 0) {
+                    // duration <= maxDuration
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+                }
 
-        if (!Strings.isNullOrEmpty(serviceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
-        }
+                if (!Strings.isNullOrEmpty(serviceId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+                }
 
-        if (!Strings.isNullOrEmpty(serviceInstanceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
-        }
+                if (!Strings.isNullOrEmpty(serviceInstanceId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+                }
 
-        if (!Strings.isNullOrEmpty(endpointId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
-        }
+                if (!Strings.isNullOrEmpty(endpointId)) {
+                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+                }
 
-        switch (traceState) {
-            case ERROR:
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
-                break;
-            case SUCCESS:
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
-                break;
-            default:
-                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
-                break;
-        }
+                switch (traceState) {
+                    case ERROR:
+                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
+                        break;
+                    case SUCCESS:
+                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
+                        break;
+                    default:
+                        query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
+                        break;
+                }
 
-        switch (queryOrder) {
-            case BY_START_TIME:
-                query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
-                break;
-            case BY_DURATION:
-                query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
-                break;
-        }
+                switch (queryOrder) {
+                    case BY_START_TIME:
+                        query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+                        break;
+                    case BY_DURATION:
+                        query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+                        break;
+                }
 
-        if (CollectionUtils.isNotEmpty(tags)) {
-            for (final Tag tag : tags) {
-                if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
-                    query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                if (CollectionUtils.isNotEmpty(tags)) {
+                    for (final Tag tag : tags) {
+                        if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
+                            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+                        }
+                    }
                 }
+
+                query.setLimit(limit);
+                query.setOffset(from);
             }
-        }
+        };
 
-        query.setLimit(limit);
-        query.setOffset(from);
+        final List<BasicTrace> basicTraces;
+        if (startSecondTB != 0 && endSecondTB != 0) {
+            basicTraces = query(BasicTrace.class, builder, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
+        } else {
+            basicTraces = query(BasicTrace.class, builder);
+        }
 
-        // build request
-        StreamQueryResponse response = this.getClient().query(query);
         TraceBrief brief = new TraceBrief();
-        brief.setTotal(response.size());
-        brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+        brief.setTotal(basicTraces.size());
+        brief.getTraces().addAll(basicTraces);
         return brief;
     }
 
     @Override
     public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
-        StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
-        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
-        query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
-        StreamQueryResponse response = this.getClient().query(query);
-        return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
+        return query(SegmentRecord.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+            }
+        });
     }
 
     @Override
     public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
         return Collections.emptyList();
     }
-
-    static long parseMillisFromStartSecondTB(long startSecondTB) {
-        return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB));
-    }
-
-    static long parseMillisFromEndSecondTB(long endSecondTB) {
-        long t = endSecondTB;
-        long second = t % 100;
-        if (second > 59) {
-            second = 0;
-        }
-        t = t / 100;
-        long minute = t % 100;
-        if (minute > 59) {
-            minute = 0;
-        }
-        t = t / 100;
-        long hour = t % 100;
-        if (hour > 23) {
-            hour = 0;
-        }
-        t = t / 100;
-        return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC)
-                .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second));
-    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 33a69a969c..95731220fd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -2,30 +2,23 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.Tag;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
 import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
 import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
  */
-public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
-    private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
-
+public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO implements UITemplateManagementDAO {
     private static final long UI_TEMPLATE_TIMESTAMP = 1L;
 
     public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
@@ -34,14 +27,15 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage
 
     @Override
     public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
-        StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection());
-        query.setLimit(10000);
-        if (!includingDisabled) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
-        }
-        query.setDataProjections(MAPPER.dataProjection());
-        StreamQueryResponse resp = this.getClient().query(query);
-        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+        return query(DashboardConfiguration.class, new QueryBuilder() {
+            @Override
+            public void apply(StreamQuery query) {
+                query.setLimit(10000);
+                if (!includingDisabled) {
+                    query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
+                }
+            }
+        });
     }
 
     @Override