You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:24 UTC

[skywalking] 03/22: complete UITemplate, ProfileTaskRecord and ProfileThreasSnapshot query

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit a363ba045b05072aa6272290566c177e68e00f50
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Tue Nov 30 19:47:54 2021 +0800

    complete UITemplate, ProfileTaskRecord and ProfileThreasSnapshot query
---
 .../plugin/banyandb/BanyanDBStorageClient.java     |   5 +-
 .../banyandb/converter/BasicTraceMapper.java       |  12 +++
 .../converter/DashboardConfigurationMapper.java    |  23 ++++-
 .../banyandb/converter/ProfileTaskMapper.java      |  28 ++++++
 .../ProfileThreadSnapshotRecordMapper.java         |  27 ++++++
 .../plugin/banyandb/converter/RowEntityMapper.java |   6 ++
 .../banyandb/converter/SegmentRecordMapper.java    |  16 +++-
 .../stream/BanyanDBProfileTaskQueryDAO.java        |  52 +++++++++-
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     | 105 +++++++++++++++++++--
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |   6 +-
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     |  19 ++--
 .../stream/BanyanDBUITemplateManagementDAO.java    |  33 +++++--
 12 files changed, 297 insertions(+), 35 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 6ae741f971..3ad4daf918 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -4,6 +4,7 @@ import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -44,8 +45,8 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
-    public void write() {
-        this.client
+    public void write(StreamWrite streamWrite) {
+        this.client.write(streamWrite);
     }
 
     public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
index bb43a8080a..b3512158f2 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
@@ -1,10 +1,12 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.IDManager;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
 
+import java.util.Collections;
 import java.util.List;
 
 public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
@@ -22,4 +24,14 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
         trace.setStart(String.valueOf(searchable.get(4).getValue()));
         return trace;
     }
+
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.emptyList();
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
index f4c4c19beb..3f7df9cb72 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
@@ -1,7 +1,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
 import org.apache.skywalking.oap.server.library.util.BooleanUtils;
 
@@ -12,9 +15,27 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo
     public DashboardConfiguration map(RowEntity row) {
         DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
         final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        // name
         dashboardConfiguration.setName((String) searchable.get(0).getValue());
+        // disabled
         dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
-        // TODO: convert back from data?
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        // activated
+        dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
+        // configuration
+        dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
+        // type
+        dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
         return dashboardConfiguration;
     }
+
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE);
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
new file mode 100644
index 0000000000..ea1ca09cb0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
@@ -0,0 +1,28 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
+                ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD,
+                ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ProfileTask map(RowEntity row) {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
new file mode 100644
index 0000000000..9a25688132
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
@@ -0,0 +1,27 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+                ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+                ProfileThreadSnapshotRecord.STACK_BINARY);
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public ProfileThreadSnapshotRecord map(RowEntity row) {
+        return null;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
index eb9b5fadba..b2facb3373 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
@@ -2,6 +2,12 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
 
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 
+import java.util.List;
+
 public interface RowEntityMapper<T> {
+    List<String> searchableProjection();
+
+    List<String> dataProjection();
+
     T map(RowEntity row);
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
index 2b20227931..6ea579ff55 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
@@ -1,9 +1,12 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
 
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 
+import java.util.Collections;
 import java.util.List;
 
 public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
@@ -20,8 +23,17 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
         record.setLatency(((Number) searchable.get(5).getValue()).intValue());
         record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
         final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
-        // TODO: support binary data in the client SDK
-        record.setDataBinary((byte[]) data.get(0).getValue());
+        record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
         return record;
     }
+
+    @Override
+    public List<String> searchableProjection() {
+        return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
+    }
+
+    @Override
+    public List<String> dataProjection() {
+        return Collections.singletonList("data_binary");
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index c99d267cbe..34af74eb93 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -1,23 +1,67 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
 import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
  */
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO {
+    private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper();
+
+    public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
-        return Collections.emptyList();
+        final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.setDataProjections(MAPPER.dataProjection());
+
+        if (StringUtil.isNotEmpty(serviceId)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId));
+        }
+
+        if (StringUtil.isNotEmpty(endpointName)) {
+            query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+        }
+
+        if (Objects.nonNull(startTimeBucket)) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+        }
+
+        if (Objects.nonNull(endTimeBucket)) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+        }
+
+        // TODO: why delete?
+
+        if (Objects.nonNull(limit)) {
+            query.setLimit(limit);
+        }
+
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
     public ProfileTask getById(String id) throws IOException {
-        return null;
+        // TODO: support id query
+        throw new UnsupportedOperationException("element id get is not supported");
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 8a27e885fe..456636f287 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -1,40 +1,133 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * {@link ProfileThreadSnapshotRecord} is a stream
  */
-public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
+public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO {
+    private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper();
+    private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
+    private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
+
+    public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
-        return Collections.emptyList();
+        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
+                .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+        StreamQueryResponse resp = getClient().query(query);
+
+        final List<String> segmentIDs = new ArrayList<>(resp.size());
+        resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId()));
+        if (segmentIDs.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // TODO: support `IN` or `OR` logic operation in BanyanDB
+        List<BasicTrace> basicTraces = new LinkedList<>();
+        for (String segmentID : segmentIDs) {
+            final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection());
+            traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
+            StreamQueryResponse traceResponse = getClient().query(traceQuery);
+            basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+        }
+
+        // TODO: Sort in DB with DESC
+        basicTraces = basicTraces.stream()
+                // comparing start_time
+                .sorted(Comparator.comparing((Function<BasicTrace, Long>) basicTrace -> Long.parseLong(basicTrace.getStart()))
+                        // and sort in reverse order
+                        .reversed())
+                .collect(Collectors.toList());
+        return basicTraces;
     }
 
     @Override
     public int queryMinSequence(String segmentId, long start, long end) throws IOException {
-        return 0;
+        return querySequenceWithAgg(AggType.MIN, segmentId, start, end);
     }
 
     @Override
     public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
-        return 0;
+        return querySequenceWithAgg(AggType.MAX, segmentId, start, end);
     }
 
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
-        return Collections.emptyList();
+        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
+                .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
+        query.setDataProjections(MAPPER.dataProjection());
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
-        return null;
+        final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
+        query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
+        StreamQueryResponse resp = getClient().query(query);
+        return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null);
+    }
+
+    private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
+        final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+        query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+                .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
+                .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
+        query.setDataProjections(MAPPER.dataProjection());
+
+        StreamQueryResponse resp = getClient().query(query);
+        List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+
+        switch (aggType) {
+            case MIN:
+                int minValue = Integer.MAX_VALUE;
+                for (final ProfileThreadSnapshotRecord record : records) {
+                    int sequence = record.getSequence();
+                    minValue = Math.min(minValue, sequence);
+                }
+                return minValue;
+            case MAX:
+                int maxValue = Integer.MIN_VALUE;
+                for (ProfileThreadSnapshotRecord record : records) {
+                    int sequence = record.getSequence();
+                    maxValue = Math.max(maxValue, sequence);
+                }
+                return maxValue;
+            default:
+                throw new IllegalArgumentException("should not reach this line");
+        }
+    }
+
+    enum AggType {
+        MIN, MAX
     }
 }
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index f59bc25f9e..54870d98c6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -45,11 +45,11 @@ public class BanyanDBRecordDAO implements IRecordDAO {
         if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
             SegmentRecord segmentRecord = (SegmentRecord) record;
             StreamWrite streamWrite = StreamWrite.builder()
-                    .name(BanyanDBSchema.NAME)
-                    .binary(segmentRecord.getDataBinary())
+                    .name(SegmentRecord.INDEX_NAME)
+                    .dataTag(Tag.binaryField(segmentRecord.getDataBinary()))
                     .timestamp(segmentRecord.getStartTime())
                     .elementId(segmentRecord.getSegmentId())
-                    .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
+                    .searchableTags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
                     .build();
             return new BanyanDBStreamInsertRequest(streamWrite);
         }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 0a3f0e53b9..a37cfbb54c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,9 +19,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.*;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
 import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
 import org.apache.skywalking.oap.server.core.query.type.*;
@@ -48,9 +49,6 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
 
     private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
 
-    private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
-    private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
-
     public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -59,9 +57,10 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
     public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
         StreamQuery query;
         if (startSecondTB != 0 && endSecondTB != 0) {
-            query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ);
+            query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB),
+                    parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection());
         } else {
-            query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ);
+            query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection());
         }
         if (minDuration != 0) {
             // duration >= minDuration
@@ -126,9 +125,9 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
 
     @Override
     public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
-        StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ);
+        StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
         query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
-        query.setDataBinary(true);
+        query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
         StreamQueryResponse response = this.getClient().query(query);
         return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 102540c7b5..a4875e65f1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -1,9 +1,10 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.Tag;
 import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
 import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
@@ -25,27 +26,45 @@ import java.util.stream.Collectors;
 public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
     private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
 
+    private static final long UI_TEMPLATE_TIMESTAMP = 1L;
+
     public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
         super(client);
     }
 
     @Override
     public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
-        StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of(
-                UITemplate.NAME,
-                UITemplate.DISABLED
-        ));
+        StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection());
         query.setLimit(10000);
         if (!includingDisabled) {
             query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
         }
+        query.setDataProjections(MAPPER.dataProjection());
         StreamQueryResponse resp = this.getClient().query(query);
         return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
     public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
-        // TODO: support single write
+        final UITemplate uiTemplate = setting.toEntity();
+
+        StreamWrite request = StreamWrite.builder()
+                .name(UITemplate.INDEX_NAME)
+                // searchable - name
+                .searchableTag(Tag.stringField(uiTemplate.getName()))
+                // searchable - disabled
+                .searchableTag(Tag.longField(uiTemplate.getDisabled()))
+                // data - type
+                .dataTag(Tag.stringField(uiTemplate.getType()))
+                // data - configuration
+                .dataTag(Tag.stringField(uiTemplate.getConfiguration()))
+                // data - activated
+                .dataTag(Tag.longField(uiTemplate.getActivated()))
+                .timestamp(UI_TEMPLATE_TIMESTAMP)
+                .elementId(uiTemplate.id())
+                .build();
+        getClient().write(request);
+        return TemplateChangeStatus.builder().status(true).build();
     }
 
     @Override
@@ -55,6 +74,6 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage
 
     @Override
     public TemplateChangeStatus disableTemplate(String name) throws IOException {
-        return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+        return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
     }
 }