You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/02/25 13:26:56 UTC

[skywalking-banyandb-java-client] 01/02: add TopNAggr metadata mgnt

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch support-topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit 4e423ad9a02e5db2c1452f8539640484accdc312
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Feb 25 20:17:01 2023 +0800

    add TopNAggr metadata mgnt
---
 .../banyandb/v1/client/AbstractCriteria.java       |   2 +-
 .../banyandb/v1/client/AbstractQuery.java          |   2 +-
 .../apache/skywalking/banyandb/v1/client/And.java  |   2 +-
 .../banyandb/v1/client/BanyanDBClient.java         |  40 ++++++
 .../apache/skywalking/banyandb/v1/client/Or.java   |   2 +-
 .../banyandb/v1/client/PairQueryCondition.java     |   2 +-
 .../v1/client/metadata/TopNAggregation.java        | 143 +++++++++++++++++++++
 .../metadata/TopNAggregationMetadataRegistry.java  |  90 +++++++++++++
 .../v1/client/AbstractBanyanDBClientTest.java      |  60 +++++++++
 .../TopNAggregationMetadataRegistryTest.java       |  86 +++++++++++++
 10 files changed, 424 insertions(+), 5 deletions(-)

diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
index 718c38b..68f808f 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.banyandb.v1.client;
 import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 
 public abstract class AbstractCriteria {
-    abstract BanyandbModel.Criteria build();
+    public abstract BanyandbModel.Criteria build();
 }
 
 abstract class LogicalExpression extends AbstractCriteria {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index 99c7675..afe74eb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -186,7 +186,7 @@ public abstract class AbstractQuery<T> {
     @RequiredArgsConstructor
     @Getter(AccessLevel.PROTECTED)
     public enum Sort {
-        ASC, DESC;
+        UNSPECIFIED, ASC, DESC;
     }
 
     @AutoValue
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
index cdf45b4..e8d7ab2 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
@@ -29,7 +29,7 @@ public abstract class And extends LogicalExpression {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setLe(BanyandbModel.LogicalExpression.newBuilder()
                         .setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 5c5e902..6dcd253 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -50,6 +50,8 @@ import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadataRegistry;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -317,6 +319,16 @@ public class BanyanDBClient implements Closeable {
         MetadataCache.INSTANCE.register(measure);
     }
 
+    /**
+     * Define a new TopNAggregation
+     *
+     * @param topNAggregation the topN rule to be created
+     */
+    public void define(TopNAggregation topNAggregation) throws BanyanDBException {
+        TopNAggregationMetadataRegistry registry = new TopNAggregationMetadataRegistry(checkNotNull(this.channel));
+        registry.create(topNAggregation);
+    }
+
     /**
      * Apply(Create or update) the property with {@link PropertyStore.Strategy#MERGE}
      *
@@ -470,6 +482,20 @@ public class BanyanDBClient implements Closeable {
         }
     }
 
+    /**
+     * Try to find the TopNAggregation from the BanyanDB with given group and name.
+     *
+     * @param group group of the TopNAggregation
+     * @param name  name of the TopNAggregation
+     * @return TopNAggregation if found. Otherwise, null is returned.
+     */
+    public TopNAggregation findTopNAggregation(String group, String name) throws BanyanDBException {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+        return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).get(group, name);
+    }
+
     /**
      * Try to find the stream from the BanyanDB with given group and name.
      *
@@ -565,6 +591,20 @@ public class BanyanDBClient implements Closeable {
         return new MeasureMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
     }
 
+    /**
+     * Check if the given TopNAggregation exists.
+     *
+     * @param group group of the TopNAggregation
+     * @param name  name of the TopNAggregation
+     * @return ResourceExist which indicates whether group and TopNAggregation exist
+     */
+    public ResourceExist existTopNAggregation(String group, String name) throws BanyanDBException {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+        return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
+    }
+
     @Override
     public void close() throws IOException {
         connectionEstablishLock.lock();
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
index ccfe3eb..f4e0ab3 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
@@ -29,7 +29,7 @@ public abstract class Or extends LogicalExpression {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setLe(BanyandbModel.LogicalExpression.newBuilder()
                         .setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
index 455f385..4f8a9bb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -35,7 +35,7 @@ public abstract class PairQueryCondition<T> extends AbstractCriteria {
     }
 
     @Override
-    BanyandbModel.Criteria build() {
+    public BanyandbModel.Criteria build() {
         return BanyandbModel.Criteria.newBuilder()
                 .setCondition(BanyandbModel.Condition.newBuilder()
                 .setName(this.tagAndValue.getTagName())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
new file mode 100644
index 0000000..2d8bf6c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
+
+import javax.annotation.Nullable;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.List;
+
+@AutoValue
+public abstract class TopNAggregation extends NamedSchema<BanyandbDatabase.TopNAggregation> {
+    public abstract String sourceMeasureName();
+
+    public abstract String fieldName();
+
+    @Nullable
+    public abstract AbstractQuery.Sort fieldValueSort();
+
+    @Nullable
+    public abstract ImmutableList<String> groupByTagNames();
+
+    @Nullable
+    abstract AbstractCriteria criteria();
+
+    abstract int countersNumber();
+
+    abstract int lruSize();
+
+    abstract TopNAggregation.Builder toBuilder();
+
+    public static TopNAggregation.Builder create(String group, String name) {
+        return new AutoValue_TopNAggregation.Builder().setGroup(group).setName(name);
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+        abstract TopNAggregation.Builder setGroup(String group);
+
+        abstract TopNAggregation.Builder setName(String name);
+
+        abstract TopNAggregation.Builder setUpdatedAt(ZonedDateTime updatedAt);
+
+        abstract TopNAggregation.Builder setLruSize(int lruSize);
+
+        public abstract TopNAggregation.Builder setCriteria(AbstractCriteria criteria);
+
+        public abstract TopNAggregation.Builder setFieldValueSort(AbstractQuery.Sort sort);
+
+        public abstract TopNAggregation.Builder setFieldName(String fieldName);
+
+        public abstract TopNAggregation.Builder setCountersNumber(int countersNumber);
+
+        public abstract TopNAggregation.Builder setGroupByTagNames(String... groupByTagNames);
+
+        public abstract TopNAggregation.Builder setGroupByTagNames(List<String> groupByTagNames);
+
+        abstract TopNAggregation.Builder setSourceMeasureName(String sourceMeasureName);
+
+        public abstract TopNAggregation build();
+    }
+
+    @Override
+    public BanyandbDatabase.TopNAggregation serialize() {
+        BanyandbDatabase.TopNAggregation.Builder bld = BanyandbDatabase.TopNAggregation.newBuilder()
+                .setMetadata(buildMetadata());
+
+        bld.setFieldName(this.fieldName())
+                .setSourceMeasure(BanyandbCommon.Metadata.newBuilder().setGroup(group()).setName(this.sourceMeasureName()).build())
+                .setCountersNumber(this.countersNumber())
+                .setLruSize(this.lruSize());
+
+        if (this.criteria() != null) {
+            bld.setCriteria(this.criteria().build());
+        }
+
+        if (this.groupByTagNames() != null) {
+            bld.addAllGroupByTagNames(this.groupByTagNames());
+        } else {
+            bld.addAllGroupByTagNames(Collections.emptyList());
+        }
+
+        if (this.fieldValueSort() == null || this.fieldValueSort() == AbstractQuery.Sort.UNSPECIFIED) {
+            bld.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED);
+        } else {
+            bld.setFieldValueSort(AbstractQuery.Sort.DESC == this.fieldValueSort() ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
+        }
+
+        if (this.updatedAt() != null) {
+            bld.setUpdatedAt(TimeUtils.buildTimestamp(updatedAt()));
+        }
+        return bld.build();
+    }
+
+    public static TopNAggregation fromProtobuf(final BanyandbDatabase.TopNAggregation pb) {
+        TopNAggregation.Builder bld = TopNAggregation.create(pb.getMetadata().getGroup(), pb.getMetadata().getName())
+                .setUpdatedAt(TimeUtils.parseTimestamp(pb.getUpdatedAt()))
+                .setCountersNumber(pb.getCountersNumber())
+                .setLruSize(pb.getLruSize())
+                .setFieldName(pb.getFieldName())
+                .setSourceMeasureName(pb.getSourceMeasure().getName())
+                .setGroupByTagNames(pb.getGroupByTagNamesList());
+
+        switch (pb.getFieldValueSort()) {
+            case SORT_ASC:
+                bld.setFieldValueSort(AbstractQuery.Sort.ASC);
+                break;
+            case SORT_DESC:
+                bld.setFieldValueSort(AbstractQuery.Sort.DESC);
+                break;
+            default:
+                bld.setFieldValueSort(AbstractQuery.Sort.UNSPECIFIED);
+        }
+
+        // TODO: deserialize Criteria
+
+        return bld.build();
+    }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
new file mode 100644
index 0000000..d10e5d7
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import io.grpc.Channel;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.v1.client.grpc.MetadataClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TopNAggregationMetadataRegistry extends MetadataClient<TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceBlockingStub,
+        BanyandbDatabase.TopNAggregation, TopNAggregation> {
+    public TopNAggregationMetadataRegistry(Channel channel) {
+        super(TopNAggregationRegistryServiceGrpc.newBlockingStub(channel));
+    }
+
+    @Override
+    public void create(TopNAggregation payload) throws BanyanDBException {
+        execute(() ->
+                stub.create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest.newBuilder()
+                        .setTopNAggregation(payload.serialize())
+                        .build()));
+    }
+
+    @Override
+    public void update(TopNAggregation payload) throws BanyanDBException {
+        execute(() ->
+                stub.update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest.newBuilder()
+                        .setTopNAggregation(payload.serialize())
+                        .build()));
+    }
+
+    @Override
+    public boolean delete(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse resp = execute(() ->
+                stub.delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return resp != null && resp.getDeleted();
+    }
+
+    @Override
+    public TopNAggregation get(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceGetResponse resp = execute(() ->
+                stub.get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+
+        return TopNAggregation.fromProtobuf(resp.getTopNAggregation());
+    }
+
+    @Override
+    public ResourceExist exist(String group, String name) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceExistResponse resp = execute(() ->
+                stub.exist(BanyandbDatabase.TopNAggregationRegistryServiceExistRequest.newBuilder()
+                        .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+                        .build()));
+        return ResourceExist.create(resp.getHasGroup(), resp.getHasTopNAggregation());
+    }
+
+    @Override
+    public List<TopNAggregation> list(String group) throws BanyanDBException {
+        BanyandbDatabase.TopNAggregationRegistryServiceListResponse resp = execute(() ->
+                stub.list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest.newBuilder()
+                        .setGroup(group)
+                        .build()));
+
+        return resp.getTopNAggregationList().stream().map(TopNAggregation::fromProtobuf).collect(Collectors.toList());
+    }
+}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
index 44b467f..a53e97e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.banyandb.database.v1.IndexRuleBindingRegistryServic
 import org.apache.skywalking.banyandb.database.v1.IndexRuleRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.database.v1.MeasureRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.database.v1.StreamRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 import org.junit.Rule;
 
@@ -253,6 +254,58 @@ public class AbstractBanyanDBClientTest {
                         }
                     }));
 
+    // measure registry
+    protected Map<String, BanyandbDatabase.TopNAggregation> topNAggregationRegistry;
+
+    private final TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase topNAggregationRegistryServiceImpl =
+            mock(TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase.class, delegatesTo(
+                    new TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase() {
+                        @Override
+                        public void create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+                                    .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+                                    .build();
+                            topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+                                    .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+                                    .build();
+                            topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse.newBuilder().build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse> responseObserver) {
+                            BanyandbDatabase.TopNAggregation oldMeasure = topNAggregationRegistry.remove(request.getMetadata().getName());
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse.newBuilder()
+                                    .setDeleted(oldMeasure != null)
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceGetResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceGetResponse.newBuilder()
+                                    .setTopNAggregation(topNAggregationRegistry.get(request.getMetadata().getName()))
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+
+                        @Override
+                        public void list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceListResponse> responseObserver) {
+                            responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceListResponse.newBuilder()
+                                    .addAllTopNAggregation(topNAggregationRegistry.values())
+                                    .build());
+                            responseObserver.onCompleted();
+                        }
+                    }));
+
     protected final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
 
     protected BanyanDBClient client;
@@ -309,4 +362,11 @@ public class AbstractBanyanDBClientTest {
             serviceRegistry.addService(measureRegistryServiceImpl);
         };
     }
+
+    protected SetupFunction bindTopNAggregationRegistry() {
+        return b -> {
+            AbstractBanyanDBClientTest.this.topNAggregationRegistry = new HashMap<>();
+            serviceRegistry.addService(topNAggregationRegistryServiceImpl);
+        };
+    }
 }
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
new file mode 100644
index 0000000..f238ffc
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import org.apache.skywalking.banyandb.v1.client.AbstractBanyanDBClientTest;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TopNAggregationMetadataRegistryTest extends AbstractBanyanDBClientTest {
+    @Before
+    public void setUp() throws IOException {
+        this.setUp(bindTopNAggregationRegistry());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndGet() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        Assert.assertTrue(topNAggregationRegistry.containsKey("service_cpm_minute_topn"));
+        TopNAggregation actualTopNAggregation = client.findTopNAggregation("sw_metric", "service_cpm_minute_topn");
+        Assert.assertNotNull(actualTopNAggregation);
+        Assert.assertEquals(expectedTopNAggregation, actualTopNAggregation);
+        Assert.assertNotNull(actualTopNAggregation.updatedAt());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndList() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        List<TopNAggregation> actualTopNAggregations = new TopNAggregationMetadataRegistry(this.channel).list("sw_metric");
+        Assert.assertNotNull(actualTopNAggregations);
+        Assert.assertEquals(1, actualTopNAggregations.size());
+    }
+
+    @Test
+    public void testTopNAggregationRegistry_createAndDelete() throws BanyanDBException {
+        TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+                .setFieldValueSort(AbstractQuery.Sort.DESC)
+                .setFieldName("value")
+                .setSourceMeasureName("service_cpm_minute")
+                .setLruSize(10)
+                .setCountersNumber(1000)
+                .setGroupByTagNames("service_id")
+                .build();
+        this.client.define(expectedTopNAggregation);
+        boolean deleted = new TopNAggregationMetadataRegistry(this.channel).delete("sw_metric", "service_cpm_minute_topn");
+        Assert.assertTrue(deleted);
+        Assert.assertEquals(0, topNAggregationRegistry.size());
+    }
+}