You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/04/16 08:42:15 UTC

[skywalking-banyandb-java-client] branch main updated: Support TopNAggregation and TopN Query (#37)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a5ff35  Support TopNAggregation and TopN Query (#37)
4a5ff35 is described below

commit 4a5ff35b37bdb65478719b47f3137e2e854eac7e
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Apr 16 16:42:10 2023 +0800

    Support TopNAggregation and TopN Query (#37)
---
 .../banyandb/v1/client/AbstractCriteria.java       |   2 +-
 .../banyandb/v1/client/AbstractQuery.java          |   2 +-
 .../apache/skywalking/banyandb/v1/client/And.java  |   2 +-
 .../banyandb/v1/client/BanyanDBClient.java         |  56 ++++++++
 .../skywalking/banyandb/v1/client/DataPoint.java   |   4 +-
 .../banyandb/v1/client/MeasureQuery.java           |   2 +-
 .../apache/skywalking/banyandb/v1/client/Or.java   |   2 +-
 .../banyandb/v1/client/PairQueryCondition.java     |   2 +-
 .../skywalking/banyandb/v1/client/TopNQuery.java   |  74 +++++++++++
 .../banyandb/v1/client/TopNQueryResponse.java      |  80 ++++++++++++
 .../v1/client/metadata/TopNAggregation.java        | 143 +++++++++++++++++++++
 .../metadata/TopNAggregationMetadataRegistry.java  |  90 +++++++++++++
 src/main/proto/banyandb/v1/banyandb-measure.proto  |   4 +-
 .../v1/client/AbstractBanyanDBClientTest.java      |  60 +++++++++
 .../TopNAggregationMetadataRegistryTest.java       |  86 +++++++++++++
 15 files changed, 599 insertions(+), 10 deletions(-)

diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
index 718c38b..68f808f 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.banyandb.v1.client;
 import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 
 public abstract class AbstractCriteria {
-    abstract BanyandbModel.Criteria build();
+    public abstract BanyandbModel.Criteria build();
 }
 
 abstract class LogicalExpression extends AbstractCriteria {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index 99c7675..afe74eb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -186,7 +186,7 @@ public abstract class AbstractQuery<T> {
     @RequiredArgsConstructor
     @Getter(AccessLevel.PROTECTED)
     public enum Sort {
-        ASC, DESC;
+        UNSPECIFIED, ASC, DESC;
     }
 
     @AutoValue
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
index cdf45b4..e8d7ab2 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
@@ -29,7 +29,7 @@ public abstract class And extends LogicalExpression {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setLe(BanyandbModel.LogicalExpression.newBuilder()
                         .setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index c6abc80..4fbe8ea 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -50,6 +50,8 @@ import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadataRegistry;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -265,6 +267,22 @@ public class BanyanDBClient implements Closeable {
         return new StreamQueryResponse(response);
     }
 
+    /**
+     * Query TopN according to given conditions
+     *
+     * @param topNQuery condition for query
+     * @return hint topN.
+     */
+    public TopNQueryResponse query(TopNQuery topNQuery) throws BanyanDBException {
+        checkState(this.measureServiceStub != null, "measure service is null");
+
+        final BanyandbMeasure.TopNResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
+                this.measureServiceBlockingStub
+                        .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
+                        .topN(topNQuery.build()));
+        return new TopNQueryResponse(response);
+    }
+
     /**
      * Query measures according to given conditions
      *
@@ -317,6 +335,16 @@ public class BanyanDBClient implements Closeable {
         MetadataCache.INSTANCE.register(measure);
     }
 
+    /**
+     * Define a new TopNAggregation
+     *
+     * @param topNAggregation the topN rule to be created
+     */
+    public void define(TopNAggregation topNAggregation) throws BanyanDBException {
+        TopNAggregationMetadataRegistry registry = new TopNAggregationMetadataRegistry(checkNotNull(this.channel));
+        registry.create(topNAggregation);
+    }
+
     /**
      * Apply(Create or update) the property with {@link PropertyStore.Strategy#MERGE}
      *
@@ -484,6 +512,20 @@ public class BanyanDBClient implements Closeable {
         }
     }
 
+    /**
+     * Try to find the TopNAggregation from the BanyanDB with given group and name.
+     *
+     * @param group group of the TopNAggregation
+     * @param name  name of the TopNAggregation
+     * @return TopNAggregation if found. Otherwise, null is returned.
+     */
+    public TopNAggregation findTopNAggregation(String group, String name) throws BanyanDBException {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+        return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).get(group, name);
+    }
+
     /**
      * Try to find the stream from the BanyanDB with given group and name.
      *
@@ -579,6 +621,20 @@ public class BanyanDBClient implements Closeable {
         return new MeasureMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
     }
 
+    /**
+     * Check if the given TopNAggregation exists.
+     *
+     * @param group group of the TopNAggregation
+     * @param name  name of the TopNAggregation
+     * @return ResourceExist which indicates whether group and TopNAggregation exist
+     */
+    public ResourceExist existTopNAggregation(String group, String name) throws BanyanDBException {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+        return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
+    }
+
     @Override
     public void close() throws IOException {
         connectionEstablishLock.lock();
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
index 74b299f..fd65532 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
@@ -40,7 +40,7 @@ public class DataPoint extends RowEntity {
         super(dataPoint.getTimestamp(), dataPoint.getTagFamiliesList());
         this.fields = new HashMap<>(dataPoint.getFieldsCount());
         for (BanyandbMeasure.DataPoint.Field f : dataPoint.getFieldsList()) {
-            this.fields.put(f.getName(), convertToJavaType(f.getValue()));
+            this.fields.put(f.getName(), convertFileValueToJavaType(f.getValue()));
         }
     }
 
@@ -48,7 +48,7 @@ public class DataPoint extends RowEntity {
         return (T) this.fields.get(fieldName);
     }
 
-    private Object convertToJavaType(BanyandbModel.FieldValue fieldValue) {
+    static Object convertFileValueToJavaType(BanyandbModel.FieldValue fieldValue) {
         switch (fieldValue.getValueCase()) {
             case INT:
                 return fieldValue.getInt().getValue();
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
index 6f3799e..4ae04af 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
@@ -217,7 +217,7 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
             MIN(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_MIN),
             COUNT(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_COUNT),
             SUM(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_SUM);
-            private final BanyandbModel.AggregationFunction function;
+            final BanyandbModel.AggregationFunction function;
         }
     }
 }
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
index ccfe3eb..f4e0ab3 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
@@ -29,7 +29,7 @@ public abstract class Or extends LogicalExpression {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setLe(BanyandbModel.LogicalExpression.newBuilder()
                         .setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
index 455f385..4f8a9bb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -35,7 +35,7 @@ public abstract class PairQueryCondition<T> extends AbstractCriteria {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setCondition(BanyandbModel.Condition.newBuilder()
                 .setName(this.tagAndValue.getTagName())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
new file mode 100644
index 0000000..9330330
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.common.base.Preconditions;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+
+import java.util.List;
+
+@Setter
+public class TopNQuery {
+    private final String group;
+    private final String name;
+    private final TimestampRange timestampRange;
+    private final int number;
+    private final AbstractQuery.Sort sort;
+    private MeasureQuery.Aggregation.Type aggregationType = MeasureQuery.Aggregation.Type.UNSPECIFIED;
+    /**
+     * Query conditions.
+     */
+    private List<PairQueryCondition<?>> conditions;
+
+    public TopNQuery(String group, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
+        Preconditions.checkArgument(sort != AbstractQuery.Sort.UNSPECIFIED);
+        Preconditions.checkArgument(number > 0);
+        this.group = group;
+        this.name = name;
+        this.timestampRange = timestampRange;
+        this.number = number;
+        this.sort = sort;
+    }
+
+    BanyandbMeasure.TopNRequest build() throws BanyanDBException {
+        BanyandbMeasure.TopNRequest.Builder bld = BanyandbMeasure.TopNRequest.newBuilder()
+                .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                .setTimeRange(timestampRange.build())
+                .setTopN(number)
+                .setFieldValueSort(AbstractQuery.Sort.DESC == sort ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
+        if (aggregationType == null) {
+            bld.setAgg(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_UNSPECIFIED);
+        } else {
+            bld.setAgg(aggregationType.function);
+        }
+        if (conditions != null && !conditions.isEmpty()) {
+            for (final PairQueryCondition<?> expr : conditions) {
+                if (expr.op != BanyandbModel.Condition.BinaryOp.BINARY_OP_EQ) {
+                    throw new UnsupportedOperationException("only equality is supported");
+                }
+                bld.addConditions(expr.build().getCondition());
+            }
+        }
+        return bld.build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java
new file mode 100644
index 0000000..e23cc92
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopNQueryResponse {
+    @Getter
+    private final List<TopNList> topNLists;
+
+    TopNQueryResponse(BanyandbMeasure.TopNResponse response) {
+        final List<BanyandbMeasure.TopNList> timelines = response.getListsList();
+        topNLists = new ArrayList<>(timelines.size());
+        for (final BanyandbMeasure.TopNList topNList : timelines) {
+            topNLists.add(new TopNList(topNList.getTimestamp(), topNList.getItemsList()));
+        }
+    }
+
+    public int size() {
+        return this.topNLists == null ? 0 : this.topNLists.size();
+    }
+
+    @Getter
+    public static class TopNList {
+        /**
+         * timestamp of the entity in the timeunit of milliseconds.
+         */
+        private final long timestamp;
+        private final List<Item> items;
+
+        private TopNList(Timestamp ts, List<BanyandbMeasure.TopNList.Item> itemsList) {
+            this.timestamp = ts.getSeconds() * 1000 + ts.getNanos() / 1_000_000;
+            this.items = new ArrayList<>(itemsList.size());
+            for (final BanyandbMeasure.TopNList.Item item : itemsList) {
+                this.items.add(Item.parseFrom(item));
+            }
+        }
+    }
+
+    @RequiredArgsConstructor
+    @Getter
+    public static class Item {
+        private final Map<String, TagAndValue<?>> tagValuesMap;
+        private final Object value;
+
+        static Item parseFrom(BanyandbMeasure.TopNList.Item item) {
+            final Object fieldValue = DataPoint.convertFileValueToJavaType(item.getValue());
+            final Map<String, TagAndValue<?>> map = new HashMap<>(item.getEntityCount());
+            for (final BanyandbModel.Tag tag : item.getEntityList()) {
+                map.put(tag.getKey(), TagAndValue.fromProtobuf(tag));
+            }
+            return new Item(map, fieldValue);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
new file mode 100644
index 0000000..956d52c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
+
+import javax.annotation.Nullable;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.List;
+
+@AutoValue
+public abstract class TopNAggregation extends NamedSchema<BanyandbDatabase.TopNAggregation> {
+    public abstract String sourceMeasureName();
+
+    public abstract String fieldName();
+
+    @Nullable
+    public abstract AbstractQuery.Sort fieldValueSort();
+
+    @Nullable
+    public abstract ImmutableList<String> groupByTagNames();
+
+    @Nullable
+    abstract AbstractCriteria criteria();
+
+    abstract int countersNumber();
+
+    abstract int lruSize();
+
+    abstract TopNAggregation.Builder toBuilder();
+
+    public static TopNAggregation.Builder create(String group, String name) {
+        return new AutoValue_TopNAggregation.Builder().setGroup(group).setName(name);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+        abstract TopNAggregation.Builder setGroup(String group);
+
+        abstract TopNAggregation.Builder setName(String name);
+
+        abstract TopNAggregation.Builder setUpdatedAt(ZonedDateTime updatedAt);
+
+        public abstract TopNAggregation.Builder setLruSize(int lruSize);
+
+        public abstract TopNAggregation.Builder setCriteria(AbstractCriteria criteria);
+
+        public abstract TopNAggregation.Builder setFieldValueSort(AbstractQuery.Sort sort);
+
+        public abstract TopNAggregation.Builder setFieldName(String fieldName);
+
+        public abstract TopNAggregation.Builder setCountersNumber(int countersNumber);
+
+        public abstract TopNAggregation.Builder setGroupByTagNames(String... groupByTagNames);
+
+        public abstract TopNAggregation.Builder setGroupByTagNames(List<String> groupByTagNames);
+
+        public abstract TopNAggregation.Builder setSourceMeasureName(String sourceMeasureName);
+
+        public abstract TopNAggregation build();
+    }
+
+    @Override
+    public BanyandbDatabase.TopNAggregation serialize() {
+        BanyandbDatabase.TopNAggregation.Builder bld = BanyandbDatabase.TopNAggregation.newBuilder()
+                .setMetadata(buildMetadata());
+
+        bld.setFieldName(this.fieldName())
+                .setSourceMeasure(BanyandbCommon.Metadata.newBuilder().setGroup(group()).setName(this.sourceMeasureName()).build())
+                .setCountersNumber(this.countersNumber())
+                .setLruSize(this.lruSize());
+
+        if (this.criteria() != null) {
+            bld.setCriteria(this.criteria().build());
+        }
+
+        if (this.groupByTagNames() != null) {
+            bld.addAllGroupByTagNames(this.groupByTagNames());
+        } else {
+            bld.addAllGroupByTagNames(Collections.emptyList());
+        }
+
+        if (this.fieldValueSort() == null || this.fieldValueSort() == AbstractQuery.Sort.UNSPECIFIED) {
+            bld.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED);
+        } else {
+            bld.setFieldValueSort(AbstractQuery.Sort.DESC == this.fieldValueSort() ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
+        }
+
+        if (this.updatedAt() != null) {
+            bld.setUpdatedAt(TimeUtils.buildTimestamp(updatedAt()));
+        }
+        return bld.build();
+    }
+
+    public static TopNAggregation fromProtobuf(final BanyandbDatabase.TopNAggregation pb) {
+        TopNAggregation.Builder bld = TopNAggregation.create(pb.getMetadata().getGroup(), pb.getMetadata().getName())
+                .setUpdatedAt(TimeUtils.parseTimestamp(pb.getUpdatedAt()))
+                .setCountersNumber(pb.getCountersNumber())
+                .setLruSize(pb.getLruSize())
+                .setFieldName(pb.getFieldName())
+                .setSourceMeasureName(pb.getSourceMeasure().getName())
+                .setGroupByTagNames(pb.getGroupByTagNamesList());
+
+        switch (pb.getFieldValueSort()) {
+            case SORT_ASC:
+                bld.setFieldValueSort(AbstractQuery.Sort.ASC);
+                break;
+            case SORT_DESC:
+                bld.setFieldValueSort(AbstractQuery.Sort.DESC);
+                break;
+            default:
+                bld.setFieldValueSort(AbstractQuery.Sort.UNSPECIFIED);
+        }
+
+        // TODO: deserialize Criteria
+
+        return bld.build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
new file mode 100644
index 0000000..d10e5d7
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import io.grpc.Channel;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.v1.client.grpc.MetadataClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TopNAggregationMetadataRegistry extends MetadataClient<TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceBlockingStub,
+        BanyandbDatabase.TopNAggregation, TopNAggregation> {
+    public TopNAggregationMetadataRegistry(Channel channel) {
+        super(TopNAggregationRegistryServiceGrpc.newBlockingStub(channel));
+    }
+
+    @Override
+    public void create(TopNAggregation payload) throws BanyanDBException {
+        execute(() ->
+                stub.create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest.newBuilder()
+                        .setTopNAggregation(payload.serialize())
+                        .build()));
+    }
+
+    @Override
+    public void update(TopNAggregation payload) throws BanyanDBException {
+        execute(() ->
+                stub.update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest.newBuilder()
+                        .setTopNAggregation(payload.serialize())
+                        .build()));
+    }
+
+    @Override
+    public boolean delete(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse resp = execute(() ->
+                stub.delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp != null && resp.getDeleted();
+    }
+
+    @Override
+    public TopNAggregation get(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceGetResponse resp = execute(() ->
+                stub.get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+
+        return TopNAggregation.fromProtobuf(resp.getTopNAggregation());
+    }
+
+    @Override
+    public ResourceExist exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.TopNAggregationRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return ResourceExist.create(resp.getHasGroup(), resp.getHasTopNAggregation());
+    }
+
+    @Override
+    public List<TopNAggregation> list(String group) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceListResponse resp = execute(() ->
+                stub.list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest.newBuilder()
+                        .setGroup(group)
+                        .build()));
+
+        return resp.getTopNAggregationList().stream().map(TopNAggregation::fromProtobuf).collect(Collectors.toList());
+    }
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-measure.proto b/src/main/proto/banyandb/v1/banyandb-measure.proto
index 7c41eb4..1d89baa 100644
--- a/src/main/proto/banyandb/v1/banyandb-measure.proto
+++ b/src/main/proto/banyandb/v1/banyandb-measure.proto
@@ -100,12 +100,12 @@ message QueryRequest {
   model.v1.QueryOrder order_by = 12;
 }
 
-//TopNList contains a series of topN items
+// TopNList contains a series of topN items
 message TopNList {
   // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   message Item {
-    string name = 1;
+    repeated model.v1.Tag entity = 1;
     model.v1.FieldValue value = 2;
   }
   // items contains top-n items in a list
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
index 44b467f..a53e97e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.banyandb.database.v1.IndexRuleBindingRegistryServic
 import org.apache.skywalking.banyandb.database.v1.IndexRuleRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.database.v1.MeasureRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.database.v1.StreamRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 import org.junit.Rule;
 
@@ -253,6 +254,58 @@ public class AbstractBanyanDBClientTest {
                         }
                     }));
 
+    // measure registry
+    protected Map<String, BanyandbDatabase.TopNAggregation> topNAggregationRegistry;
+
+    private final TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase topNAggregationRegistryServiceImpl =
+            mock(TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase.class, delegatesTo(
+                    new TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase() {
+                        @Override
+                        public void create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+                                    .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+                                    .build();
+                            topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+                                    .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+                                    .build();
+                            topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation oldMeasure = topNAggregationRegistry.remove(request.getMetadata().getName());
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse.newBuilder()
+                                    .setDeleted(oldMeasure != null)
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceGetResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceGetResponse.newBuilder()
+                                    .setTopNAggregation(topNAggregationRegistry.get(request.getMetadata().getName()))
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceListResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceListResponse.newBuilder()
+                                    .addAllTopNAggregation(topNAggregationRegistry.values())
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+                    }));
+
     protected final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
 
     protected BanyanDBClient client;
@@ -309,4 +362,11 @@ public class AbstractBanyanDBClientTest {
             serviceRegistry.addService(measureRegistryServiceImpl);
         };
     }
+
+    protected SetupFunction bindTopNAggregationRegistry() {
+        return b -> {
+            AbstractBanyanDBClientTest.this.topNAggregationRegistry = new HashMap<>();
+            serviceRegistry.addService(topNAggregationRegistryServiceImpl);
+        };
+    }
 }
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
new file mode 100644
index 0000000..f238ffc
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import org.apache.skywalking.banyandb.v1.client.AbstractBanyanDBClientTest;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TopNAggregationMetadataRegistryTest extends AbstractBanyanDBClientTest {
+    @Before
+    public void setUp() throws IOException {
+        this.setUp(bindTopNAggregationRegistry());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndGet() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        Assert.assertTrue(topNAggregationRegistry.containsKey("service_cpm_minute_topn"));
+        TopNAggregation actualTopNAggregation = client.findTopNAggregation("sw_metric", "service_cpm_minute_topn");
+        Assert.assertNotNull(actualTopNAggregation);
+        Assert.assertEquals(expectedTopNAggregation, actualTopNAggregation);
+        Assert.assertNotNull(actualTopNAggregation.updatedAt());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndList() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        List<TopNAggregation> actualTopNAggregations = new TopNAggregationMetadataRegistry(this.channel).list("sw_metric");
+        Assert.assertNotNull(actualTopNAggregations);
+        Assert.assertEquals(1, actualTopNAggregations.size());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndDelete() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        boolean deleted = new TopNAggregationMetadataRegistry(this.channel).delete("sw_metric", "service_cpm_minute_topn");
+        Assert.assertTrue(deleted);
+        Assert.assertEquals(0, topNAggregationRegistry.size());
+    }
+}