You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/02/25 14:52:53 UTC

[skywalking] branch banyandb-topn created (now 307e13eebd)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a change to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git


      at 307e13eebd support topn query

This branch includes the following new commits:

     new 438e8054ea registry TopN
     new 307e13eebd support topn query

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/02: registry TopN

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 438e8054ea37ce8c149d89ef8685af23d1b6d11e
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Feb 25 21:28:21 2023 +0800

    registry TopN
---
 oap-server-bom/pom.xml                             |  2 +-
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  7 ++-
 .../plugin/banyandb/BanyanDBStorageClient.java     | 11 ++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 62 ++++++++++++++++++++--
 4 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index 289353d600..6be20194f7 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -72,7 +72,7 @@
         <awaitility.version>3.0.0</awaitility.version>
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.3.0</banyandb-java-client.version>
+        <banyandb-java-client.version>0.4.0-SNAPSHOT</banyandb-java-client.version>
         <kafka-clients.version>2.8.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <consul.client.version>1.5.3</consul.client.version>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index b765dd4f63..e96aed80c3 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -88,12 +88,17 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
             } else { // measure
                 Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
                 if (measure != null) {
-                    log.info("install measure schema {}", model.getName());
+                    log.info("install measure schema {}", measure.name());
                     ((BanyanDBStorageClient) client).define(measure);
+                    final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
+                    MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c);
+                    log.info("installed TopN schema for measure {}", measure.name());
                 }
             }
         } catch (IOException ex) {
             throw new StorageException("fail to install schema", ex);
+        } catch (BanyanDBException ex) {
+            throw new StorageException("fail to install TopN schema", ex);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index f2408f8695..73b527872d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -146,6 +147,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public void define(TopNAggregation topNAggregation) throws IOException {
+        try {
+            this.client.define(topNAggregation);
+            this.healthChecker.health();
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to define TopNAggregation", ex);
+        }
+    }
+
     public void write(StreamWrite streamWrite) {
         this.client.write(streamWrite);
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 5756bd710e..ed8bc0b42d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.gson.JsonObject;
 import io.grpc.Status;
 
@@ -36,6 +37,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import lombok.Builder;
 import lombok.Data;
@@ -47,6 +49,7 @@ import lombok.Setter;
 import lombok.Singular;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
@@ -59,11 +62,14 @@ import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
+import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.enumeration.Step;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
@@ -72,6 +78,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 @Slf4j
@@ -132,7 +139,7 @@ public enum MetadataRegistry {
         // parse and set sharding keys
         List<String> shardingColumns = parseEntityNames(modelColumnMap);
         if (shardingColumns.isEmpty()) {
-           throw new StorageException("model " + model.getName() + " doesn't contain series id");
+            throw new StorageException("model " + model.getName() + " doesn't contain series id");
         }
         // parse tag metadata
         // this can be used to build both
@@ -167,10 +174,26 @@ public enum MetadataRegistry {
             builder.addField(field);
             schemaBuilder.field(field.getName());
         }
+        // parse TopN
+        schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields));
+
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return builder.build();
     }
 
+    private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) {
+        if (CollectionUtils.isEmpty(tagsAndFields.fields)) {
+            return null;
+        }
+        // TODO: how to configure parameters?
+        return TopNSpec.builder()
+                .lruSize(5)
+                .countersNumber(10)
+                .fieldName(tagsAndFields.fields.get(0).getName())
+                .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and BottomN
+                .build();
+    }
+
     public Schema findMetadata(final Model model) {
         if (model.isRecord()) {
             return findRecordMetadata(model.getName());
@@ -339,8 +362,6 @@ public enum MetadataRegistry {
      */
     MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder builder, List<String> shardingColumns) {
         // skip metric
-        Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE
-                .readValueColumnDefinition(model.getName());
         MeasureMetadata.MeasureMetadataBuilder result = MeasureMetadata.builder();
         for (final ModelColumn col : model.getColumns()) {
             final String columnStorageName = col.getColumnName().getStorageName();
@@ -575,6 +596,9 @@ public enum MetadataRegistry {
             }
         }
 
+        /**
+         * @return name of the Stream/Measure in the BanyanDB
+         */
         public String name() {
             if (this.kind == Kind.MEASURE) {
                 return formatName(this.modelName, this.downSampling);
@@ -638,9 +662,41 @@ public enum MetadataRegistry {
         @Getter
         private final String timestampColumn4Stream;
 
+        @Getter
+        @Nullable
+        private final TopNSpec topNSpec;
+
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
         }
+
+        public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
+            if (this.topNSpec == null) {
+                if (this.metadata.kind == Kind.MEASURE) {
+                    log.debug("skip null TopN Schema for [{}]", metadata.getModelName());
+                }
+                return;
+            }
+            client.define(TopNAggregation.create(getMetadata().getGroup(), getMetadata().name() + "_topn")
+                    .setSourceMeasureName(getMetadata().name())
+                    .setFieldValueSort(topNSpec.sort)
+                    .setFieldName(topNSpec.fieldName)
+                    .setGroupByTagNames(topNSpec.groupByTagNames)
+                    .setCountersNumber(topNSpec.countersNumber)
+                    .setLruSize(topNSpec.lruSize)
+                    .build());
+        }
+    }
+
+    @Builder
+    @EqualsAndHashCode
+    public static class TopNSpec {
+        @Singular
+        private final List<String> groupByTagNames;
+        private final String fieldName;
+        private final AbstractQuery.Sort sort;
+        private final int lruSize;
+        private final int countersNumber;
     }
 
     @RequiredArgsConstructor


[skywalking] 02/02: support topn query

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 307e13eebdc8724a27b4b609c9ccbf92edf85774
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Feb 25 22:51:10 2023 +0800

    support topn query
---
 .../banyandb/BanyanDBAggregationQueryDAO.java      | 60 +++++++++++++++++-----
 .../plugin/banyandb/BanyanDBStorageClient.java     | 13 +++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 11 +++-
 .../banyandb/stream/AbstractBanyanDBDAO.java       | 18 +++++++
 4 files changed, 86 insertions(+), 16 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
index 396cb47202..4c52bcec7b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
@@ -51,6 +52,43 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
     public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration, List<KeyValue> additionalConditions) throws IOException {
         final String modelName = condition.getName();
         final TimestampRange timestampRange = new TimestampRange(duration.getStartTimestamp(), duration.getEndTimestamp());
+        // fast-path: BanyanDB server-side TopN support
+        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
+        if (schema == null) {
+            throw new IOException("schema is not registered");
+        }
+
+        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
+        if (spec == null) {
+            throw new IOException("field spec is not registered");
+        }
+
+        if (schema.hasTopNAggregation()) {
+            TopNQueryResponse resp = null;
+            if (condition.getOrder() == Order.DES) {
+                resp = topN(schema, timestampRange, condition.getTopN());
+            } else {
+                resp = bottomN(schema, timestampRange, condition.getTopN());
+            }
+
+            if (resp.getTopNLists().isEmpty()) {
+                return Collections.emptyList();
+            } else if (resp.getTopNLists().size() > 1) { // since we have done aggregation, i.e. MEAN
+                throw new IOException("invalid TopN response");
+            }
+
+            final List<SelectedRecord> topNList = new ArrayList<>();
+            for (TopNQueryResponse.Item item : resp.getTopNLists().get(0).getItems()) {
+                SelectedRecord record = new SelectedRecord();
+                record.setId(item.getName());
+                record.setValue(extractFieldValueAsString(spec, item.getValue()));
+                topNList.add(record);
+            }
+
+            return topNList;
+        }
+
+        // slow-path: TopN using vanilla Measure query
         MeasureQueryResponse resp = query(modelName, TAGS, Collections.singleton(valueColumnName),
                 timestampRange, new QueryBuilder<MeasureQuery>() {
                     @Override
@@ -75,16 +113,6 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
             return Collections.emptyList();
         }
 
-        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
-        if (schema == null) {
-            throw new IOException("schema is not registered");
-        }
-
-        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
-        if (spec == null) {
-            throw new IOException("field spec is not registered");
-        }
-
         final List<SelectedRecord> topNList = new ArrayList<>();
         for (DataPoint dataPoint : resp.getDataPoints()) {
             SelectedRecord record = new SelectedRecord();
@@ -96,13 +124,17 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
         return topNList;
     }
 
-    private String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+    private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+        return extractFieldValueAsString(spec, dataPoint.getFieldValue(fieldName));
+    }
+
+    private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, Object fieldValue) throws IOException {
         if (double.class.equals(spec.getColumnClass())) {
-            return String.valueOf(ByteUtil.bytes2Double(dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(ByteUtil.bytes2Double((byte[]) fieldValue).longValue());
         } else if (String.class.equals(spec.getColumnClass())) {
-            return dataPoint.getFieldValue(fieldName);
+            return (String) fieldValue;
         } else {
-            return String.valueOf(((Number) dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(((Number) fieldValue).longValue());
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 73b527872d..0b460eead7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -27,6 +27,8 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
@@ -117,6 +119,17 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public TopNQueryResponse query(TopNQuery q) throws IOException {
+        try {
+            TopNQueryResponse response = this.client.query(q);
+            this.healthChecker.health();
+            return response;
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to query topn", ex);
+        }
+    }
+
     public void define(Property property) throws IOException {
         try {
             this.client.apply(property);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index ed8bc0b42d..fb01e338a7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -175,18 +175,19 @@ public enum MetadataRegistry {
             schemaBuilder.field(field.getName());
         }
         // parse TopN
-        schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields));
+        schemaBuilder.topNSpec(parseTopNSpec(schemaMetadata.name(), tagsAndFields));
 
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return builder.build();
     }
 
-    private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) {
+    private TopNSpec parseTopNSpec(final String measureName, final MeasureMetadata tagsAndFields) {
         if (CollectionUtils.isEmpty(tagsAndFields.fields)) {
             return null;
         }
         // TODO: how to configure parameters?
         return TopNSpec.builder()
+                .name(measureName + "_topn")
                 .lruSize(5)
                 .countersNumber(10)
                 .fieldName(tagsAndFields.fields.get(0).getName())
@@ -670,6 +671,10 @@ public enum MetadataRegistry {
             return this.specs.get(columnName);
         }
 
+        public boolean hasTopNAggregation() {
+            return topNSpec != null;
+        }
+
         public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
             if (this.topNSpec == null) {
                 if (this.metadata.kind == Kind.MEASURE) {
@@ -690,7 +695,9 @@ public enum MetadataRegistry {
 
     @Builder
     @EqualsAndHashCode
+    @Getter
     public static class TopNSpec {
+        private final String name;
         @Singular
         private final List<String> groupByTagNames;
         private final String fieldName;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index fd53ab114d..b27eade3cc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -28,6 +28,8 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -75,6 +77,22 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         return this.query(measureModelName, tags, fields, null, builder);
     }
 
+    protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+        final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+                timestampRange,
+                number, AbstractQuery.Sort.DESC);
+        q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+        return getClient().query(q);
+    }
+
+    protected TopNQueryResponse bottomN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+        final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+                timestampRange,
+                number, AbstractQuery.Sort.ASC);
+        q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+        return getClient().query(q);
+    }
+
     protected MeasureQueryResponse query(String measureModelName, Set<String> tags, Set<String> fields,
                                          TimestampRange timestampRange, QueryBuilder<MeasureQuery> builder) throws IOException {
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute);