You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/04/25 02:38:03 UTC

[skywalking] branch master updated: Integrate BanyanDB server-side TopN (#10448)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 06558fcf5a Integrate BanyanDB server-side TopN (#10448)
06558fcf5a is described below

commit 06558fcf5ab6afc6eaf3b30aa0ca541545e2d3ea
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Tue Apr 25 10:37:55 2023 +0800

    Integrate BanyanDB server-side TopN (#10448)
---
 docs/en/changes/changes.md                         |  1 +
 oap-server-bom/pom.xml                             |  2 +-
 .../org/apache/skywalking/oal/rt/OALRuntime.java   |  5 ++
 .../skywalking/oal/rt/parser/SourceColumn.java     |  5 +-
 .../oal/rt/parser/SourceColumnsFactory.java        |  2 +-
 .../core/browser/source/BrowserAppPagePerf.java    |  2 +-
 .../oap/server/core/source/DefaultScopeDefine.java |  4 +-
 .../oap/server/core/source/Endpoint.java           |  2 +-
 .../oap/server/core/source/ScopeDefaultColumn.java | 15 +++-
 .../server/core/storage/annotation/BanyanDB.java   | 44 ++++++++++--
 .../core/storage/model/BanyanDBModelExtension.java | 33 +++++++++
 .../server/core/storage/model/StorageModels.java   | 22 ++++--
 .../banyandb/BanyanDBAggregationQueryDAO.java      | 80 +++++++++++++++++-----
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  6 +-
 .../plugin/banyandb/BanyanDBStorageClient.java     | 24 +++++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 74 +++++++++++++++++++-
 .../banyandb/stream/AbstractBanyanDBDAO.java       | 31 +++++++++
 test/e2e-v2/cases/storage/banyandb/e2e.yaml        |  4 +-
 .../storage/expected/metrics-top-endpoint.yml}     | 19 ++---
 test/e2e-v2/cases/storage/storage-cases.yaml       |  4 +-
 test/e2e-v2/script/env                             |  2 +-
 21 files changed, 320 insertions(+), 61 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index c960fdb489..01f655aa21 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -36,6 +36,7 @@
 * Bump up armeria to 1.23.1
 * Support Elasticsearch Monitoring.
 * Fix PromQL HTTP API `/api/v1/series` response missing `service` label when matching metric.
+* Support ServerSide TopN for BanyanDB.
 
 #### UI
 * Revert: cpm5d function. This feature is cancelled from backend.
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index d2611c2e22..52200e205e 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -72,7 +72,7 @@
         <awaitility.version>3.0.0</awaitility.version>
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.3.1</banyandb-java-client.version>
+        <banyandb-java-client.version>0.4.0-rc0</banyandb-java-client.version>
         <kafka-clients.version>2.8.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <consul.client.version>1.5.3</consul.client.version>
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
index 72905bbefe..53937f7b29 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/OALRuntime.java
@@ -277,6 +277,11 @@ public class OALRuntime implements OALEngine {
                     annotationsAttribute.addAnnotation(banyanShardingKeyAnnotation);
                 }
 
+                if (field.isGroupByCondInTopN()) {
+                    Annotation banyanTopNAggregationAnnotation = new Annotation(BanyanDB.TopNAggregation.class.getName(), constPool);
+                    annotationsAttribute.addAnnotation(banyanTopNAggregationAnnotation);
+                }
+
                 newField.getFieldInfo().addAttribute(annotationsAttribute);
             } catch (CannotCompileException e) {
                 log.error(
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
index b1d6618c09..f8cea14003 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
@@ -34,8 +34,10 @@ public class SourceColumn {
     private int length;
     private String fieldSetter;
     private String fieldGetter;
+    private final boolean groupByCondInTopN;
 
-    public SourceColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length) {
+    public SourceColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length,
+                        boolean groupByCondInTopN) {
         this.fieldName = fieldName;
         this.columnName = columnName;
         this.type = type;
@@ -45,6 +47,7 @@ public class SourceColumn {
 
         this.fieldGetter = ClassMethodUtil.toGetMethod(fieldName);
         this.fieldSetter = ClassMethodUtil.toSetMethod(fieldName);
+        this.groupByCondInTopN = groupByCondInTopN;
     }
 
     public void setFieldName(String fieldName) {
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
index 2339513bf0..fdd3c0d6d7 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
@@ -31,7 +31,7 @@ public class SourceColumnsFactory {
         for (ScopeDefaultColumn defaultColumn : columns) {
             sourceColumns.add(
                 new SourceColumn(defaultColumn.getFieldName(), defaultColumn.getColumnName(), defaultColumn
-                    .getType(), defaultColumn.isID(), defaultColumn.getLength()));
+                    .getType(), defaultColumn.isID(), defaultColumn.getLength(), defaultColumn.isGroupByCondInTopN()));
         }
         return sourceColumns;
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
index c9ded0962a..bede297ef0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
@@ -40,7 +40,7 @@ public class BrowserAppPagePerf extends BrowserAppPerfSource {
     }
 
     @Getter
-    @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
+    @ScopeDefaultColumn.DefinedByField(columnName = "service_id", groupByCondInTopN = true)
     private String serviceId;
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index df445c002b..838d7af936 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -205,7 +205,7 @@ public class DefaultScopeDefine {
         if (virtualColumn != null) {
             scopeDefaultColumns.add(
                 new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn
-                    .type(), virtualColumn.isID(), virtualColumn.length()));
+                    .type(), virtualColumn.isID(), virtualColumn.length(), false));
         }
         Field[] scopeClassField = originalClass.getDeclaredFields();
         if (scopeClassField != null) {
@@ -217,7 +217,7 @@ public class DefaultScopeDefine {
                         scopeDefaultColumns.add(
                             new ScopeDefaultColumn(
                                 field.getName(), definedByField.columnName(), field.getType(), false,
-                                definedByField.length()
+                                definedByField.length(), definedByField.groupByCondInTopN()
                             ));
                     }
                 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
index ac42420571..9e7933e0ab 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
@@ -54,7 +54,7 @@ public class Endpoint extends Source {
     @ScopeDefaultColumn.DefinedByField(columnName = "name", requireDynamicActive = true)
     private String name;
     @Getter
-    @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
+    @ScopeDefaultColumn.DefinedByField(columnName = "service_id", groupByCondInTopN = true)
     private String serviceId;
     @Getter
     @Setter
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
index 3ebef71fe1..5d2986e102 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
@@ -36,13 +36,15 @@ public class ScopeDefaultColumn {
     private Class<?> type;
     private boolean isID;
     private int length;
+    private final boolean groupByCondInTopN;
 
-    public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length) {
+    public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length, boolean groupByCondInTopN) {
         this.fieldName = fieldName;
         this.columnName = columnName;
         this.type = type;
         this.isID = isID;
         this.length = length;
+        this.groupByCondInTopN = groupByCondInTopN;
     }
 
     @Target({ElementType.FIELD})
@@ -54,7 +56,7 @@ public class ScopeDefaultColumn {
          * Dynamic active means this column is only activated through core setting explicitly.
          *
          * @return FALSE: this column is not going to be added to the final generated metric as a column.
-         *         TRUE: this column could be added as a column if core/activeExtraModelColumns == true.
+         * TRUE: this column could be added as a column if core/activeExtraModelColumns == true.
          */
         boolean requireDynamicActive() default false;
 
@@ -62,6 +64,13 @@ public class ScopeDefaultColumn {
          * Define column length, only effective when the type is String.
          */
         int length() default 256;
+
+        /**
+         * Indicate whether this column is a condition for groupBy in the TopN Aggregation.
+         *
+         * @since 9.5.0
+         */
+        boolean groupByCondInTopN() default false;
     }
 
     @Target({ElementType.TYPE})
@@ -76,7 +85,7 @@ public class ScopeDefaultColumn {
         /**
          * Declare this virtual column is representing an entity ID of this source and generated metrics.
          * Typically, metric ID = timestamp + entity ID
-         *
+         * <p>
          * This takes {@link ISource#getEntityId()}'s return as the value.
          *
          * @return TRUE if this is an ID column.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index a306cd8c5d..cb9ad293b8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.oap.server.core.storage.annotation;
 
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 
 /**
@@ -47,7 +49,7 @@ public @interface BanyanDB {
 
     /**
      * Series key is used to group time series data per metric of one entity in one place.
-     *
+     * <p>
      * For example,
      * ServiceA's traffic gauge, service call per minute, includes following timestamp values, then it should be sharded
      * by service ID
@@ -74,7 +76,7 @@ public @interface BanyanDB {
     @interface SeriesID {
         /**
          * Relative entity tag
-         *
+         * <p>
          * The index number determines the order of the column placed in the SeriesID.
          * BanyanDB SeriesID searching procedure uses a prefix-scanning strategy.
          * Searching series against a prefix could improve the performance.
@@ -131,6 +133,7 @@ public @interface BanyanDB {
     /**
      * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
      * BanyanDB stream requires a timestamp in milliseconds.
+     *
      * @since 9.3.0
      */
     @Target({ElementType.TYPE})
@@ -141,13 +144,14 @@ public @interface BanyanDB {
 
     /**
      * MeasureField defines a column as a measure's field.
-     *
+     * <p>
      * Annotated: the column is a measure field.
      * Unannotated: the column is a measure tag.
-     *   storageOnly=true: the column is a measure tag that is not indexed.
-     *   storageOnly=false: the column is a measure tag that is indexed.
-     *   indexOnly=true: the column is a measure tag that is indexed, but not stored.
-     *   indexOnly=false: the column is a measure tag that is indexed and stored.
+     * storageOnly=true: the column is a measure tag that is not indexed.
+     * storageOnly=false: the column is a measure tag that is indexed.
+     * indexOnly=true: the column is a measure tag that is indexed, but not stored.
+     * indexOnly=false: the column is a measure tag that is indexed and stored.
+     *
      * @since 9.4.0
      */
     @Target({ElementType.FIELD})
@@ -157,10 +161,36 @@ public @interface BanyanDB {
 
     /**
      * StoreIDTag indicates a metric store its ID as a tag for searching.
+     *
      * @since 9.4.0
      */
     @Target({ElementType.TYPE})
     @Retention(RetentionPolicy.RUNTIME)
     @interface StoreIDAsTag {
     }
+
+    /**
+     * Generate a TopN Aggregation and use the annotated column as a groupBy tag.
+     * It also contains parameters for TopNAggregation
+     *
+     * @since 9.4.0
+     */
+    @Target({ElementType.FIELD})
+    @Retention(RetentionPolicy.RUNTIME)
+    @Inherited
+    @interface TopNAggregation {
+        /**
+         * The size of LRU determines the maximally tolerated time range.
+         * The buffers in the time range are kept in the memory so that
+         * the data in [T - lruSize * n, T] would be accepted in the pre-aggregation process.
+         * T = the current time in the current dimensionality.
+         * n = interval in the current dimensionality.
+         */
+        int lruSize() default 2;
+
+        /**
+         * The max size of entries in a time window for the pre-aggregation.
+         */
+        int countersNumber() default 1000;
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
index 73b1819b29..ae9c7d657c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -22,6 +22,8 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 
+import java.util.List;
+
 /**
  * BanyanDBExtension represents extra metadata for models, but specific for BanyanDB usages.
  *
@@ -31,6 +33,7 @@ public class BanyanDBModelExtension {
     /**
      * timestampColumn is to identify which column in {@link Record} is providing the timestamp(millisecond) for BanyanDB.
      * BanyanDB stream requires a timestamp in milliseconds
+     *
      * @since 9.3.0
      */
     @Getter
@@ -45,4 +48,34 @@ public class BanyanDBModelExtension {
     @Setter
     private boolean storeIDTag;
 
+    @Getter
+    @Setter
+    private TopN topN;
+
+    public static class TopN {
+        /**
+         * lru_size defines how many time_buckets are held in the memory.
+         * For example, "2" means data points belonging to the latest "2" time_buckets will be persisted.
+         * The default value is 2 in the BanyanDB if not set.
+         *
+         * @since 9.4.0
+         */
+        @Getter
+        @Setter
+        private int lruSize;
+
+        /**
+         * counters_number defines the max number of entries to be tracked during the pre-aggregation.
+         * The default value is 1000 in the BanyanDB if not set.
+         *
+         * @since 9.4.0
+         */
+        @Getter
+        @Setter
+        private int countersNumber;
+
+        @Setter
+        @Getter
+        private List<String> groupByTagNames;
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index c76d820ae1..27ad875886 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -62,7 +62,7 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
         SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
         BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
         ElasticSearchModelExtension elasticSearchModelExtension = new ElasticSearchModelExtension();
-        retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension);
+        retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
         // Add extra column for additional entities
         if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
             || aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
@@ -98,9 +98,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
         if (aClass.isAnnotationPresent(BanyanDB.StoreIDAsTag.class)) {
             banyanDBModelExtension.setStoreIDTag(true);
         }
+
         // Set routing rules for ElasticSearch
         elasticSearchModelExtension.setRouting(storage.getModelName(), modelColumns);
-        
+
         checker.check(storage.getModelName());
 
         Model model = new Model(
@@ -149,7 +150,8 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
                            final List<ModelColumn> modelColumns,
                            final int scopeId,
                            ShardingKeyChecker checker,
-                           final SQLDatabaseModelExtension sqlDBModelExtension) {
+                           final SQLDatabaseModelExtension sqlDBModelExtension,
+                           final BanyanDBModelExtension banyanDBModelExtension) {
         if (log.isDebugEnabled()) {
             log.debug("Analysis {} to generate Model.", clazz.getName());
         }
@@ -212,7 +214,9 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
                 final BanyanDB.IndexRule banyanDBIndexRule = field.getAnnotation(
                     BanyanDB.IndexRule.class);
                 final BanyanDB.MeasureField banyanDBMeasureField = field.getAnnotation(
-                        BanyanDB.MeasureField.class);
+                    BanyanDB.MeasureField.class);
+                final BanyanDB.TopNAggregation topNAggregation = field.getAnnotation(
+                    BanyanDB.TopNAggregation.class);
                 BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
                     banyanDBSeriesID == null ? -1 : banyanDBSeriesID.index(),
                     banyanDBGlobalIndex != null,
@@ -221,6 +225,14 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
                     banyanDBMeasureField != null
                 );
 
+                if (topNAggregation != null) {
+                    BanyanDBModelExtension.TopN topN = new BanyanDBModelExtension.TopN();
+                    topN.setLruSize(topNAggregation.lruSize());
+                    topN.setCountersNumber(topNAggregation.countersNumber());
+                    topN.setGroupByTagNames(Collections.singletonList(column.name()));
+                    banyanDBModelExtension.setTopN(topN);
+                }
+
                 final ModelColumn modelColumn = new ModelColumn(
                     new ColumnName(column),
                     field.getType(),
@@ -262,7 +274,7 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
         }
 
         if (Objects.nonNull(clazz.getSuperclass())) {
-            retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension);
+            retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
         }
     }
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
index 396cb47202..a83d0e68df 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
@@ -39,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements IAggregationQueryDAO {
     private static final Set<String> TAGS = ImmutableSet.of(Metrics.ENTITY_ID);
@@ -51,7 +53,59 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
     public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration, List<KeyValue> additionalConditions) throws IOException {
         final String modelName = condition.getName();
         final TimestampRange timestampRange = new TimestampRange(duration.getStartTimestamp(), duration.getEndTimestamp());
-        MeasureQueryResponse resp = query(modelName, TAGS, Collections.singleton(valueColumnName),
+        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
+        if (schema == null) {
+            throw new IOException("schema is not registered");
+        }
+
+        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
+        if (spec == null) {
+            throw new IOException("field spec is not registered");
+        }
+
+        // BanyanDB server-side TopN support for metrics pre-aggregation.
+        if (schema.getTopNSpec() != null) {
+            // 1) no additional conditions
+            // 2) additional conditions are all group by tags
+            if (CollectionUtils.isEmpty(additionalConditions) ||
+                    additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet())
+                            .equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNames()))) {
+                return serverSideTopN(condition, schema, spec, timestampRange, additionalConditions);
+            }
+        }
+
+        return directMetricsTopN(condition, valueColumnName, spec, timestampRange, additionalConditions);
+    }
+
+    List<SelectedRecord> serverSideTopN(TopNCondition condition, MetadataRegistry.Schema schema, MetadataRegistry.ColumnSpec valueColumnSpec,
+                                        TimestampRange timestampRange, List<KeyValue> additionalConditions) throws IOException {
+        TopNQueryResponse resp = null;
+        if (condition.getOrder() == Order.DES) {
+            resp = topN(schema, timestampRange, condition.getTopN(), additionalConditions);
+        } else {
+            resp = bottomN(schema, timestampRange, condition.getTopN(), additionalConditions);
+        }
+
+        if (resp.size() == 0) {
+            return Collections.emptyList();
+        } else if (resp.size() > 1) { // since we have done aggregation, i.e. MEAN
+            throw new IOException("invalid TopN response");
+        }
+
+        final List<SelectedRecord> topNList = new ArrayList<>();
+        for (TopNQueryResponse.Item item : resp.getTopNLists().get(0).getItems()) {
+            SelectedRecord record = new SelectedRecord();
+            record.setId((String) item.getTagValuesMap().get(Metrics.ENTITY_ID).getValue());
+            record.setValue(extractFieldValueAsString(valueColumnSpec, item.getValue()));
+            topNList.add(record);
+        }
+
+        return topNList;
+    }
+
+    List<SelectedRecord> directMetricsTopN(TopNCondition condition, String valueColumnName, MetadataRegistry.ColumnSpec valueColumnSpec,
+                                           TimestampRange timestampRange, List<KeyValue> additionalConditions) throws IOException {
+        MeasureQueryResponse resp = query(condition.getName(), TAGS, Collections.singleton(valueColumnName),
                 timestampRange, new QueryBuilder<MeasureQuery>() {
                     @Override
                     protected void apply(MeasureQuery query) {
@@ -75,34 +129,24 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
             return Collections.emptyList();
         }
 
-        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
-        if (schema == null) {
-            throw new IOException("schema is not registered");
-        }
-
-        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
-        if (spec == null) {
-            throw new IOException("field spec is not registered");
-        }
-
         final List<SelectedRecord> topNList = new ArrayList<>();
-        for (DataPoint dataPoint : resp.getDataPoints()) {
-            SelectedRecord record = new SelectedRecord();
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            final SelectedRecord record = new SelectedRecord();
             record.setId(dataPoint.getTagValue(Metrics.ENTITY_ID));
-            record.setValue(extractFieldValueAsString(spec, valueColumnName, dataPoint));
+            record.setValue(extractFieldValueAsString(valueColumnSpec, dataPoint.getFieldValue(valueColumnName)));
             topNList.add(record);
         }
 
         return topNList;
     }
 
-    private String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+    private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, Object fieldValue) {
         if (double.class.equals(spec.getColumnClass())) {
-            return String.valueOf(ByteUtil.bytes2Double(dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(ByteUtil.bytes2Double((byte[]) fieldValue).longValue());
         } else if (String.class.equals(spec.getColumnClass())) {
-            return dataPoint.getFieldValue(fieldName);
+            return (String) fieldValue;
         } else {
-            return String.valueOf(((Number) dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(((Number) fieldValue).longValue());
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index b765dd4f63..0ea91671b1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -88,12 +88,16 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
             } else { // measure
                 Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
                 if (measure != null) {
-                    log.info("install measure schema {}", model.getName());
+                    log.info("install measure schema {}", measure.name());
                     ((BanyanDBStorageClient) client).define(measure);
+                    final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
+                    MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c);
                 }
             }
         } catch (IOException ex) {
             throw new StorageException("fail to install schema", ex);
+        } catch (BanyanDBException ex) {
+            throw new StorageException("fail to install TopN schema", ex);
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index a9a55b40f1..81c2b0c0e0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -28,11 +28,14 @@ import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.AlreadyExistsException;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -118,6 +121,17 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public TopNQueryResponse query(TopNQuery q) throws IOException {
+        try {
+            TopNQueryResponse response = this.client.query(q);
+            this.healthChecker.health();
+            return response;
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to query topn", ex);
+        }
+    }
+
     public void define(Property property) throws IOException {
         try {
             this.client.apply(property);
@@ -161,6 +175,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public void define(TopNAggregation topNAggregation) throws IOException {
+        try {
+            this.client.define(topNAggregation);
+            this.healthChecker.health();
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to define TopNAggregation", ex);
+        }
+    }
+
     public void write(StreamWrite streamWrite) {
         this.client.write(streamWrite);
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 5756bd710e..70860cda8a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import lombok.Builder;
 import lombok.Data;
@@ -47,6 +48,7 @@ import lombok.Setter;
 import lombok.Singular;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
@@ -59,6 +61,7 @@ import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -72,6 +75,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 @Slf4j
@@ -132,7 +136,7 @@ public enum MetadataRegistry {
         // parse and set sharding keys
         List<String> shardingColumns = parseEntityNames(modelColumnMap);
         if (shardingColumns.isEmpty()) {
-           throw new StorageException("model " + model.getName() + " doesn't contain series id");
+            throw new StorageException("model " + model.getName() + " doesn't contain series id");
         }
         // parse tag metadata
         // this can be used to build both
@@ -167,10 +171,38 @@ public enum MetadataRegistry {
             builder.addField(field);
             schemaBuilder.field(field.getName());
         }
+        // parse TopN
+        schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
+
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return builder.build();
     }
 
+    private TopNSpec parseTopNSpec(final Model model, final String measureName)
+            throws StorageException {
+        if (model.getBanyanDBModelExtension().getTopN() == null) {
+            return null;
+        }
+
+        final Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE.readValueColumnDefinition(model.getName());
+        if (valueColumnOpt.isEmpty() || valueColumnOpt.get().getDataType() != Column.ValueDataType.COMMON_VALUE) {
+            // skip non-single valued metrics
+            return null;
+        }
+
+        if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) {
+            throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames());
+        }
+        return TopNSpec.builder()
+                .name(measureName + "_topn")
+                .lruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
+                .countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
+                .fieldName(valueColumnOpt.get().getValueCName())
+                .groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
+                .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and BottomN
+                .build();
+    }
+
     public Schema findMetadata(final Model model) {
         if (model.isRecord()) {
             return findRecordMetadata(model.getName());
@@ -339,8 +371,6 @@ public enum MetadataRegistry {
      */
     MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder builder, List<String> shardingColumns) {
         // skip metric
-        Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE
-                .readValueColumnDefinition(model.getName());
         MeasureMetadata.MeasureMetadataBuilder result = MeasureMetadata.builder();
         for (final ModelColumn col : model.getColumns()) {
             final String columnStorageName = col.getColumnName().getStorageName();
@@ -575,6 +605,9 @@ public enum MetadataRegistry {
             }
         }
 
+        /**
+         * @return name of the Stream/Measure in the BanyanDB
+         */
         public String name() {
             if (this.kind == Kind.MEASURE) {
                 return formatName(this.modelName, this.downSampling);
@@ -638,9 +671,44 @@ public enum MetadataRegistry {
         @Getter
         private final String timestampColumn4Stream;
 
+        @Getter
+        @Nullable
+        private final TopNSpec topNSpec;
+
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
         }
+
+        public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
+            if (this.getTopNSpec() == null) {
+                if (this.metadata.kind == Kind.MEASURE) {
+                    log.debug("skip null TopN Schema for [{}]", metadata.getModelName());
+                }
+                return;
+            }
+            client.define(TopNAggregation.create(getMetadata().getGroup(), this.getTopNSpec().getName())
+                    .setSourceMeasureName(getMetadata().name())
+                    .setFieldValueSort(this.getTopNSpec().getSort())
+                    .setFieldName(this.getTopNSpec().getFieldName())
+                    .setGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
+                    .setCountersNumber(this.getTopNSpec().getCountersNumber())
+                    .setLruSize(this.getTopNSpec().getLruSize())
+                    .build());
+            log.info("installed TopN schema for measure {}", getMetadata().name());
+        }
+    }
+
+    @Builder
+    @EqualsAndHashCode
+    @Getter
+    public static class TopNSpec {
+        private final String name;
+        @Singular
+        private final List<String> groupByTagNames;
+        private final String fieldName;
+        private final AbstractQuery.Sort sort;
+        private final int lruSize;
+        private final int countersNumber;
     }
 
     @RequiredArgsConstructor
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index fd53ab114d..2f66ee5069 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -28,13 +28,18 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -75,6 +80,32 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         return this.query(measureModelName, tags, fields, null, builder);
     }
 
+    protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number,
+                                     List<KeyValue> additionalConditions) throws IOException {
+        return topNQuery(schema, timestampRange, number, AbstractQuery.Sort.DESC, additionalConditions);
+    }
+
+    protected TopNQueryResponse bottomN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number,
+                                        List<KeyValue> additionalConditions) throws IOException {
+        return topNQuery(schema, timestampRange, number, AbstractQuery.Sort.ASC, additionalConditions);
+    }
+
+    private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number,
+                                        AbstractQuery.Sort sort, List<KeyValue> additionalConditions) throws IOException {
+        final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+                timestampRange,
+                number, sort);
+        q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+        if (CollectionUtils.isNotEmpty(additionalConditions)) {
+            List<PairQueryCondition<?>> conditions = new ArrayList<>(additionalConditions.size());
+            for (final KeyValue kv : additionalConditions) {
+                conditions.add(PairQueryCondition.StringQueryCondition.eq(kv.getKey(), kv.getValue()));
+            }
+            q.setConditions(conditions);
+        }
+        return getClient().query(q);
+    }
+
     protected MeasureQueryResponse query(String measureModelName, Set<String> tags, Set<String> fields,
                                          TimestampRange timestampRange, QueryBuilder<MeasureQuery> builder) throws IOException {
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute);
diff --git a/test/e2e-v2/cases/storage/banyandb/e2e.yaml b/test/e2e-v2/cases/storage/banyandb/e2e.yaml
index 833e1957dc..aac323269a 100644
--- a/test/e2e-v2/cases/storage/banyandb/e2e.yaml
+++ b/test/e2e-v2/cases/storage/banyandb/e2e.yaml
@@ -30,8 +30,8 @@ setup:
 
 trigger:
   action: http
-  interval: 3s
-  times: 10
+  interval: 5s
+  times: 40
   url: http://${consumer_host}:${consumer_9092}/users
   method: POST
   body: '{"id":"123","name":"skywalking"}'
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/cases/storage/expected/metrics-top-endpoint.yml
similarity index 50%
copy from test/e2e-v2/script/env
copy to test/e2e-v2/cases/storage/expected/metrics-top-endpoint.yml
index 338a875304..a89a98d72e 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/cases/storage/expected/metrics-top-endpoint.yml
@@ -13,16 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SW_AGENT_JAVA_COMMIT=3f88d735ba2bfd1196aff946502447d4b14450c8
-SW_AGENT_SATELLITE_COMMIT=ea27a3f4e126a24775fe12e2aa2695bcb23d99c3
-SW_AGENT_NGINX_LUA_COMMIT=c3cee4841798a147d83b96a10914d4ac0e11d0aa
-SW_AGENT_NODEJS_COMMIT=2e7560518aff846befd4d6bc815fe5e38c704a11
-SW_AGENT_GO_COMMIT=4af380c2db6243106b0fc650b6003ce3b3eb82a0
-SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
-SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
-SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
-SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
-SW_BANYANDB_COMMIT=adbd3e87df7f84e5d1904fcf40476d2e81842058
-
-SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
+{{- contains . }}
+- name: {{ notEmpty .name }}
+  id: ""
+  value: '{{- gt .value "0" }}'
+  refid: ~
+{{- end }}
diff --git a/test/e2e-v2/cases/storage/storage-cases.yaml b/test/e2e-v2/cases/storage/storage-cases.yaml
index 65b53eb71b..2e7d968753 100644
--- a/test/e2e-v2/cases/storage/storage-cases.yaml
+++ b/test/e2e-v2/cases/storage/storage-cases.yaml
@@ -145,7 +145,9 @@ cases:
     expected: expected/metrics-has-value-percentile.yml
   - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics multiple-linear --name=endpoint_percentile --endpoint-name=POST:/users --service-name=e2e-service-consumer |yq e 'to_entries | with(.[] ; .value=(.value | to_entries))' -
     expected: expected/metrics-has-value-percentile.yml
-
+  # Endpoint TopN with service_id
+  - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics top --name endpoint_cpm --service-name e2e-service-provider 5
+    expected: expected/metrics-top-endpoint.yml
   # native event: event list
   - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql event list
     expected: expected/event-list.yml
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 338a875304..ebdbddcbd2 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,6 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
 SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
-SW_BANYANDB_COMMIT=adbd3e87df7f84e5d1904fcf40476d2e81842058
+SW_BANYANDB_COMMIT=dea8c1e37d4dc19fe18397deb576151a22e2fad8
 
 SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068