You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:23 UTC

[skywalking] 04/24: add alarm, networkAddressAlias and logs

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit cb705ed0cc8d014839f2f13078442307b6e2b9df
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Wed Dec 1 23:29:21 2021 +0800

    add alarm, networkAddressAlias and logs
---
 .../banyandb/converter/ProfileTaskMapper.java      | 28 ---------
 .../ProfileThreadSnapshotRecordMapper.java         | 27 --------
 .../banyandb/deserializer/AlarmMessageMapper.java  | 51 +++++++++++++++
 .../BasicTraceMapper.java                          |  2 +-
 .../deserializer/BrowserErrorLogMapper.java        | 59 ++++++++++++++++++
 .../DashboardConfigurationMapper.java              |  2 +-
 .../plugin/banyandb/deserializer/EventMapper.java  | 24 ++++++++
 .../plugin/banyandb/deserializer/LogMapper.java    | 61 ++++++++++++++++++
 .../deserializer/NetworkAddressAliasMapper.java    | 40 ++++++++++++
 .../deserializer/ProfileTaskLogMapper.java         | 39 ++++++++++++
 .../banyandb/deserializer/ProfileTaskMapper.java   | 42 +++++++++++++
 .../ProfileThreadSnapshotRecordMapper.java         | 36 +++++++++++
 .../RowEntityMapper.java                           |  2 +-
 .../SegmentRecordMapper.java                       |  2 +-
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     | 47 +++++++++++++-
 .../stream/BanyanDBBrowserLogQueryDAO.java         | 50 ++++++++++++++-
 .../banyandb/stream/BanyanDBLogQueryDAO.java       | 72 +++++++++++++++++++++-
 .../stream/BanyanDBNetworkAddressAliasDAO.java     | 23 ++++++-
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     | 31 +++++++++-
 .../stream/BanyanDBProfileTaskQueryDAO.java        | 32 +++++++---
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |  8 +--
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     |  6 +-
 .../stream/BanyanDBUITemplateManagementDAO.java    |  4 +-
 23 files changed, 597 insertions(+), 91 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
deleted file mode 100644
index ea1ca09cb0..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
-                ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD,
-                ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public ProfileTask map(RowEntity row) {
-        return null;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
deleted file mode 100644
index 9a25688132..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
-    @Override
-    public List<String> searchableProjection() {
-        return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
-                ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
-                ProfileThreadSnapshotRecord.STACK_BINARY);
-    }
-
-    @Override
-    public List<String> dataProjection() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public ProfileThreadSnapshotRecord map(RowEntity row) {
-        return null;
-    }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
new file mode 100644
index 0000000000..d906dbb78f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
@@ -0,0 +1,51 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+
+import java.util.List;
+
+@RequiredArgsConstructor
+public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
+    private final IAlarmQueryDAO alarmQueryDAO;
+
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(AlarmRecord.SCOPE, // 0
+                AlarmRecord.START_TIME); // 1
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return ImmutableList.of(AlarmRecord.ID0, // 0
+                AlarmRecord.ID1, // 1
+                AlarmRecord.ALARM_MESSAGE, // 2
+                AlarmRecord.TAGS_RAW_DATA); // 3
+    }
+
+    @Override
+    public AlarmMessage map(RowEntity row) {
+        AlarmMessage alarmMessage = new AlarmMessage();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        int scopeID = ((Number) searchable.get(0).getValue()).intValue();
+        alarmMessage.setScopeId(scopeID);
+        alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
+        alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        alarmMessage.setId((String) data.get(0).getValue());
+        alarmMessage.setId1((String) data.get(1).getValue());
+        alarmMessage.setMessage((String) data.get(2).getValue());
+        Object o = data.get(3).getValue();
+        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+            this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+        }
+        return alarmMessage;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
similarity index 98%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
index b3512158f2..bbcf7091d0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
new file mode 100644
index 0000000000..0807955376
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
@@ -0,0 +1,59 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
+import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
+
+import java.util.List;
+
+public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+                BrowserErrorLogRecord.SERVICE_VERSION_ID,
+                BrowserErrorLogRecord.PAGE_PATH_ID,
+                BrowserErrorLogRecord.ERROR_CATEGORY,
+                BrowserErrorLogRecord.TIMESTAMP
+        );
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY);
+    }
+
+    @Override
+    public BrowserErrorLog map(RowEntity row) {
+        // FIXME: use protobuf directly
+        BrowserErrorLog log = new BrowserErrorLog();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        log.setService((String) searchable.get(0).getValue());
+        log.setServiceVersion((String) searchable.get(1).getValue());
+        log.setPagePath((String) searchable.get(2).getValue());
+        log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
+        log.setTime(((Number) searchable.get(4).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        Object o = data.get(0).getValue();
+        if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+            try {
+                org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+                        .parseFrom((ByteString) o);
+                log.setGrade(browserErrorLog.getGrade());
+                log.setCol(browserErrorLog.getCol());
+                log.setLine(browserErrorLog.getLine());
+                log.setMessage(browserErrorLog.getMessage());
+                log.setErrorUrl(browserErrorLog.getErrorUrl());
+                log.setStack(browserErrorLog.getStack());
+                log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+            } catch (InvalidProtocolBufferException ex) {
+                throw new RuntimeException("fail to parse proto buffer", ex);
+            }
+        }
+        return log;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
similarity index 99%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
index 3f7df9cb72..6b9e0962e6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
new file mode 100644
index 0000000000..1c94903cbd
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
@@ -0,0 +1,24 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.query.type.event.Event;
+
+import java.util.List;
+
+public class EventMapper implements RowEntityMapper<Event> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of();
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return null;
+    }
+
+    @Override
+    public Event map(RowEntity row) {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
new file mode 100644
index 0000000000..f1c5aaf442
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
@@ -0,0 +1,61 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.Log;
+
+import java.util.List;
+
+public class LogMapper implements RowEntityMapper<Log> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(
+                AbstractLogRecord.SERVICE_ID, // 0
+                AbstractLogRecord.SERVICE_INSTANCE_ID, // 1
+                AbstractLogRecord.ENDPOINT_ID, // 2
+                AbstractLogRecord.TRACE_ID, // 3
+                AbstractLogRecord.TRACE_SEGMENT_ID,
+                AbstractLogRecord.SPAN_ID,
+                AbstractLogRecord.TIMESTAMP); // 6
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE,
+                AbstractLogRecord.CONTENT,
+                AbstractLogRecord.TAGS_RAW_DATA); // 2
+    }
+
+    @Override
+    public Log map(RowEntity row) {
+        Log log = new Log();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        log.setServiceId((String) searchable.get(0).getValue());
+        log.setServiceInstanceId((String) searchable.get(1).getValue());
+        log.setEndpointId((String) searchable.get(2).getValue());
+        log.setTraceId((String) searchable.get(3).getValue());
+        log.setTimestamp(((Number) searchable.get(6).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
+            log.setContent("");
+        } else {
+            try {
+                // Don't read the tags as they have been in the data binary already.
+                LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
+                for (final KeyStringValuePair pair : logTags.getDataList()) {
+                    log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
+                }
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return log;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
new file mode 100644
index 0000000000..aeb823c752
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
@@ -0,0 +1,40 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+import java.util.List;
+
+public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        // TODO: make these static fields public
+        return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id");
+    }
+
+    @Override
+    public NetworkAddressAlias map(RowEntity row) {
+        NetworkAddressAlias model = new NetworkAddressAlias();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        // searchable - last_update_time_bucket
+        model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        // data - time_bucket
+        model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
+        // data - address
+        model.setAddress((String) data.get(1).getValue());
+        // data - represent_service_id
+        model.setRepresentServiceId((String) data.get(2).getValue());
+        // data - represent_service_instance_id
+        model.setRepresentServiceInstanceId((String) data.get(3).getValue());
+        return model;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
new file mode 100644
index 0000000000..bc157253f0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
@@ -0,0 +1,39 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
+
+import java.util.List;
+
+public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
+                ProfileTaskLogRecord.OPERATION_TYPE);
+    }
+
+    @Override
+    public ProfileTaskLog map(RowEntity row) {
+        ProfileTaskLog profileTaskLog = new ProfileTaskLog();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        // searchable - operation_time
+        profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        // searchable - task_id
+        profileTaskLog.setTaskId((String) data.get(0).getValue());
+        // searchable - instance_id
+        profileTaskLog.setInstanceId((String) data.get(1).getValue());
+        // searchable - operation_type
+        profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
+        return profileTaskLog;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
new file mode 100644
index 0000000000..8a49404d23
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
@@ -0,0 +1,42 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+    public static final String ID = "profile_task_query_id";
+
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+                ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ProfileTask map(RowEntity row) {
+        ProfileTask profileTask = new ProfileTask();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        profileTask.setId((String) searchable.get(0).getValue());
+        profileTask.setServiceId((String) searchable.get(1).getValue());
+        profileTask.setEndpointName((String) searchable.get(2).getValue());
+        profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
+        profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
+        profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
+        profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
+        profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
+        profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
new file mode 100644
index 0000000000..99f23117f2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
@@ -0,0 +1,36 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY);
+    }
+
+    @Override
+    public ProfileThreadSnapshotRecord map(RowEntity row) {
+        ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        record.setTaskId((String) searchable.get(0).getValue());
+        record.setSegmentId((String) searchable.get(1).getValue());
+        record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
+        record.setSequence(((Number) searchable.get(3).getValue()).intValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
+        return record;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
similarity index 95%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
index b2facb3373..cc1d48d94f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
similarity index 99%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
index 6ea579ff55..70318136f6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
 
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index c7bc9cc13d..7e122611bf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -1,19 +1,62 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
 import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
  * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
  */
-public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO {
+public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO {
+    private final RowEntityMapper<AlarmMessage> mapper;
+
+    public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+        mapper = new AlarmMessageMapper(this);
+    }
+
     @Override
     public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
-        return new Alarms();
+        final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection());
+
+        if (Objects.nonNull(scopeId)) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
+        }
+        if (startTB != 0 && endTB != 0) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
+        }
+
+        // TODO: support keyword search
+
+        // TODO: support tag search
+
+        query.setLimit(limit);
+        query.setOffset(from);
+
+        StreamQueryResponse resp = getClient().query(query);
+
+        List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList());
+
+        Alarms alarms = new Alarms();
+        alarms.setTotal(messages.size());
+        alarms.getMsgs().addAll(messages);
+        return alarms;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 7607668eba..25f2c27f4b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -1,17 +1,63 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
 import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
 import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
  */
-public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
+public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO {
+    private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper();
+
+    public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
-        return new BrowserErrorLogs();
+        final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.setDataProjections(MAPPER.dataProjection());
+
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+
+        if (startSecondTB != 0 && endSecondTB != 0) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
+        }
+        if (StringUtil.isNotEmpty(serviceVersionId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+        }
+        if (StringUtil.isNotEmpty(pagePathId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+        }
+        if (Objects.nonNull(category)) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+        }
+
+        query.setOffset(from);
+        query.setLimit(limit);
+
+        final StreamQueryResponse resp = getClient().query(query);
+
+        final BrowserErrorLogs logs = new BrowserErrorLogs();
+        logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()));
+        logs.setTotal(logs.getLogs().size());
+        return logs;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index baba93b886..661f65b930 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -1,20 +1,86 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.Log;
 import org.apache.skywalking.oap.server.core.query.type.Logs;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
  */
-public class BanyanDBLogQueryDAO implements ILogQueryDAO {
+public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO {
+    private static final RowEntityMapper<Log> MAPPER = new LogMapper();
+
+    public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
-    public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException {
-        return new Logs();
+    public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
+                          TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
+                          long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent,
+                          List<String> excludingKeywordsOfContent) throws IOException {
+        final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection());
+        if (StringUtil.isNotEmpty(serviceId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+        }
+
+        if (startTB != 0 && endTB != 0) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
+        }
+
+        if (StringUtil.isNotEmpty(serviceInstanceId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+        }
+        if (StringUtil.isNotEmpty(endpointId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+        }
+        if (Objects.nonNull(relatedTrace)) {
+            if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+            }
+            if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
+                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+            }
+            if (Objects.nonNull(relatedTrace.getSpanId())) {
+                query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+            }
+        }
+
+        // TODO: if we allow to index tags?
+//        if (CollectionUtils.isNotEmpty(tags)) {
+//            for (final Tag tag : tags) {
+//                query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+//            }
+//        }
+
+        StreamQueryResponse resp = getClient().query(query);
+
+        List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+
+        Logs logs = new Logs();
+        logs.getLogs().addAll(entities);
+        logs.setTotal(entities.size());
+
+        return logs;
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index 10675a49a1..337c288e96 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -1,17 +1,34 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link NetworkAddressAlias} is a stream
  */
-public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO {
+    private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper();
+
+    public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
-        return Collections.emptyList();
+        StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 2b6290e2d2..6c19e2f701 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -1,18 +1,43 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
  */
-public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO {
+    private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper();
+
+    private final int queryMaxSize;
+
+    public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) {
+        super(client);
+        this.queryMaxSize = queryMaxSize;
+    }
+
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
-        return Collections.emptyList();
+        final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.setDataProjections(MAPPER.dataProjection());
+        query.setLimit(this.queryMaxSize);
+
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream()
+                .map(MAPPER::map)
+                .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+                .collect(Collectors.toList());
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 34af74eb93..67f28ce62f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -10,8 +10,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;
@@ -34,34 +34,46 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
         query.setDataProjections(MAPPER.dataProjection());
 
         if (StringUtil.isNotEmpty(serviceId)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId));
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+                    ProfileTaskRecord.SERVICE_ID, serviceId));
         }
 
         if (StringUtil.isNotEmpty(endpointName)) {
-            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+                    ProfileTaskRecord.ENDPOINT_NAME, endpointName));
         }
 
         if (Objects.nonNull(startTimeBucket)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
+                    ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
         }
 
         if (Objects.nonNull(endTimeBucket)) {
-            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
+                    ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
         }
 
-        // TODO: why delete?
-
         if (Objects.nonNull(limit)) {
             query.setLimit(limit);
         }
 
+        query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+
         StreamQueryResponse resp = getClient().query(query);
         return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
     public ProfileTask getById(String id) throws IOException {
-        // TODO: support id query
-        throw new UnsupportedOperationException("element id get is not supported");
+        if (StringUtil.isEmpty(id)) {
+            return null;
+        }
+
+        final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
+        query.setLimit(1);
+
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null);
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 456636f287..cb5067e890 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -9,10 +9,10 @@ import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileThreadSnapshotRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index a37cfbb54c..838e8a1eb3 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -31,9 +31,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index a4875e65f1..33a69a969c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -13,8 +13,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
 
 import java.io.IOException;
 import java.util.List;