You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2023/04/16 08:42:15 UTC
[skywalking-banyandb-java-client] branch main updated: Support TopNAggregation and TopN Query (#37)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push:
new 4a5ff35 Support TopNAggregation and TopN Query (#37)
4a5ff35 is described below
commit 4a5ff35b37bdb65478719b47f3137e2e854eac7e
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Apr 16 16:42:10 2023 +0800
Support TopNAggregation and TopN Query (#37)
---
.../banyandb/v1/client/AbstractCriteria.java | 2 +-
.../banyandb/v1/client/AbstractQuery.java | 2 +-
.../apache/skywalking/banyandb/v1/client/And.java | 2 +-
.../banyandb/v1/client/BanyanDBClient.java | 56 ++++++++
.../skywalking/banyandb/v1/client/DataPoint.java | 4 +-
.../banyandb/v1/client/MeasureQuery.java | 2 +-
.../apache/skywalking/banyandb/v1/client/Or.java | 2 +-
.../banyandb/v1/client/PairQueryCondition.java | 2 +-
.../skywalking/banyandb/v1/client/TopNQuery.java | 74 +++++++++++
.../banyandb/v1/client/TopNQueryResponse.java | 80 ++++++++++++
.../v1/client/metadata/TopNAggregation.java | 143 +++++++++++++++++++++
.../metadata/TopNAggregationMetadataRegistry.java | 90 +++++++++++++
src/main/proto/banyandb/v1/banyandb-measure.proto | 4 +-
.../v1/client/AbstractBanyanDBClientTest.java | 60 +++++++++
.../TopNAggregationMetadataRegistryTest.java | 86 +++++++++++++
15 files changed, 599 insertions(+), 10 deletions(-)
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
index 718c38b..68f808f 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractCriteria.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.banyandb.v1.client;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
public abstract class AbstractCriteria {
- abstract BanyandbModel.Criteria build();
+ public abstract BanyandbModel.Criteria build();
}
abstract class LogicalExpression extends AbstractCriteria {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index 99c7675..afe74eb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -186,7 +186,7 @@ public abstract class AbstractQuery<T> {
@RequiredArgsConstructor
@Getter(AccessLevel.PROTECTED)
public enum Sort {
- ASC, DESC;
+ UNSPECIFIED, ASC, DESC;
}
@AutoValue
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
index cdf45b4..e8d7ab2 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/And.java
@@ -29,7 +29,7 @@ public abstract class And extends LogicalExpression {
}
@Override
- BanyandbModel.Criteria build() {
+ public BanyandbModel.Criteria build() {
return BanyandbModel.Criteria.newBuilder()
.setLe(BanyandbModel.LogicalExpression.newBuilder()
.setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index c6abc80..4fbe8ea 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -50,6 +50,8 @@ import org.apache.skywalking.banyandb.v1.client.metadata.PropertyStore;
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadataRegistry;
import java.io.Closeable;
import java.io.IOException;
@@ -265,6 +267,22 @@ public class BanyanDBClient implements Closeable {
return new StreamQueryResponse(response);
}
+ /**
+ * Query TopN according to given conditions
+ *
+ * @param topNQuery condition for query
+ * @return hint topN.
+ */
+ public TopNQueryResponse query(TopNQuery topNQuery) throws BanyanDBException {
+ checkState(this.measureServiceStub != null, "measure service is null");
+
+ final BanyandbMeasure.TopNResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.measureServiceBlockingStub
+ .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
+ .topN(topNQuery.build()));
+ return new TopNQueryResponse(response);
+ }
+
/**
* Query measures according to given conditions
*
@@ -317,6 +335,16 @@ public class BanyanDBClient implements Closeable {
MetadataCache.INSTANCE.register(measure);
}
+ /**
+ * Define a new TopNAggregation
+ *
+ * @param topNAggregation the topN rule to be created
+ */
+ public void define(TopNAggregation topNAggregation) throws BanyanDBException {
+ TopNAggregationMetadataRegistry registry = new TopNAggregationMetadataRegistry(checkNotNull(this.channel));
+ registry.create(topNAggregation);
+ }
+
/**
* Apply(Create or update) the property with {@link PropertyStore.Strategy#MERGE}
*
@@ -484,6 +512,20 @@ public class BanyanDBClient implements Closeable {
}
}
+ /**
+ * Try to find the TopNAggregation from the BanyanDB with given group and name.
+ *
+ * @param group group of the TopNAggregation
+ * @param name name of the TopNAggregation
+ * @return TopNAggregation if found. Otherwise, null is returned.
+ */
+ public TopNAggregation findTopNAggregation(String group, String name) throws BanyanDBException {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+ return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).get(group, name);
+ }
+
/**
* Try to find the stream from the BanyanDB with given group and name.
*
@@ -579,6 +621,20 @@ public class BanyanDBClient implements Closeable {
return new MeasureMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
}
+ /**
+ * Check if the given TopNAggregation exists.
+ *
+ * @param group group of the TopNAggregation
+ * @param name name of the TopNAggregation
+ * @return ResourceExist which indicates whether group and TopNAggregation exist
+ */
+ public ResourceExist existTopNAggregation(String group, String name) throws BanyanDBException {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
+
+ return new TopNAggregationMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
+ }
+
@Override
public void close() throws IOException {
connectionEstablishLock.lock();
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
index 74b299f..fd65532 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/DataPoint.java
@@ -40,7 +40,7 @@ public class DataPoint extends RowEntity {
super(dataPoint.getTimestamp(), dataPoint.getTagFamiliesList());
this.fields = new HashMap<>(dataPoint.getFieldsCount());
for (BanyandbMeasure.DataPoint.Field f : dataPoint.getFieldsList()) {
- this.fields.put(f.getName(), convertToJavaType(f.getValue()));
+ this.fields.put(f.getName(), convertFileValueToJavaType(f.getValue()));
}
}
@@ -48,7 +48,7 @@ public class DataPoint extends RowEntity {
return (T) this.fields.get(fieldName);
}
- private Object convertToJavaType(BanyandbModel.FieldValue fieldValue) {
+ static Object convertFileValueToJavaType(BanyandbModel.FieldValue fieldValue) {
switch (fieldValue.getValueCase()) {
case INT:
return fieldValue.getInt().getValue();
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
index 6f3799e..4ae04af 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
@@ -217,7 +217,7 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
MIN(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_MIN),
COUNT(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_COUNT),
SUM(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_SUM);
- private final BanyandbModel.AggregationFunction function;
+ final BanyandbModel.AggregationFunction function;
}
}
}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
index ccfe3eb..f4e0ab3 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Or.java
@@ -29,7 +29,7 @@ public abstract class Or extends LogicalExpression {
}
@Override
- BanyandbModel.Criteria build() {
+ public BanyandbModel.Criteria build() {
return BanyandbModel.Criteria.newBuilder()
.setLe(BanyandbModel.LogicalExpression.newBuilder()
.setLeft(left().build())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
index 455f385..4f8a9bb 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/PairQueryCondition.java
@@ -35,7 +35,7 @@ public abstract class PairQueryCondition<T> extends AbstractCriteria {
}
@Override
- BanyandbModel.Criteria build() {
+ public BanyandbModel.Criteria build() {
return BanyandbModel.Criteria.newBuilder()
.setCondition(BanyandbModel.Condition.newBuilder()
.setName(this.tagAndValue.getTagName())
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
new file mode 100644
index 0000000..9330330
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.common.base.Preconditions;
+import lombok.Setter;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+
+import java.util.List;
+
+@Setter
+public class TopNQuery {
+ private final String group;
+ private final String name;
+ private final TimestampRange timestampRange;
+ private final int number;
+ private final AbstractQuery.Sort sort;
+ private MeasureQuery.Aggregation.Type aggregationType = MeasureQuery.Aggregation.Type.UNSPECIFIED;
+ /**
+ * Query conditions.
+ */
+ private List<PairQueryCondition<?>> conditions;
+
+ public TopNQuery(String group, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
+ Preconditions.checkArgument(sort != AbstractQuery.Sort.UNSPECIFIED);
+ Preconditions.checkArgument(number > 0);
+ this.group = group;
+ this.name = name;
+ this.timestampRange = timestampRange;
+ this.number = number;
+ this.sort = sort;
+ }
+
+ BanyandbMeasure.TopNRequest build() throws BanyanDBException {
+ BanyandbMeasure.TopNRequest.Builder bld = BanyandbMeasure.TopNRequest.newBuilder()
+ .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+ .setTimeRange(timestampRange.build())
+ .setTopN(number)
+ .setFieldValueSort(AbstractQuery.Sort.DESC == sort ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
+ if (aggregationType == null) {
+ bld.setAgg(BanyandbModel.AggregationFunction.AGGREGATION_FUNCTION_UNSPECIFIED);
+ } else {
+ bld.setAgg(aggregationType.function);
+ }
+ if (conditions != null && !conditions.isEmpty()) {
+ for (final PairQueryCondition<?> expr : conditions) {
+ if (expr.op != BanyandbModel.Condition.BinaryOp.BINARY_OP_EQ) {
+ throw new UnsupportedOperationException("only equality is supported");
+ }
+ bld.addConditions(expr.build().getCondition());
+ }
+ }
+ return bld.build();
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java
new file mode 100644
index 0000000..e23cc92
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQueryResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client;
+
+import com.google.protobuf.Timestamp;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopNQueryResponse {
+ @Getter
+ private final List<TopNList> topNLists;
+
+ TopNQueryResponse(BanyandbMeasure.TopNResponse response) {
+ final List<BanyandbMeasure.TopNList> timelines = response.getListsList();
+ topNLists = new ArrayList<>(timelines.size());
+ for (final BanyandbMeasure.TopNList topNList : timelines) {
+ topNLists.add(new TopNList(topNList.getTimestamp(), topNList.getItemsList()));
+ }
+ }
+
+ public int size() {
+ return this.topNLists == null ? 0 : this.topNLists.size();
+ }
+
+ @Getter
+ public static class TopNList {
+ /**
+ * timestamp of the entity in the timeunit of milliseconds.
+ */
+ private final long timestamp;
+ private final List<Item> items;
+
+ private TopNList(Timestamp ts, List<BanyandbMeasure.TopNList.Item> itemsList) {
+ this.timestamp = ts.getSeconds() * 1000 + ts.getNanos() / 1_000_000;
+ this.items = new ArrayList<>(itemsList.size());
+ for (final BanyandbMeasure.TopNList.Item item : itemsList) {
+ this.items.add(Item.parseFrom(item));
+ }
+ }
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ public static class Item {
+ private final Map<String, TagAndValue<?>> tagValuesMap;
+ private final Object value;
+
+ static Item parseFrom(BanyandbMeasure.TopNList.Item item) {
+ final Object fieldValue = DataPoint.convertFileValueToJavaType(item.getValue());
+ final Map<String, TagAndValue<?>> map = new HashMap<>(item.getEntityCount());
+ for (final BanyandbModel.Tag tag : item.getEntityList()) {
+ map.put(tag.getKey(), TagAndValue.fromProtobuf(tag));
+ }
+ return new Item(map, fieldValue);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
new file mode 100644
index 0000000..956d52c
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregation.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
+
+import javax.annotation.Nullable;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.List;
+
+@AutoValue
+public abstract class TopNAggregation extends NamedSchema<BanyandbDatabase.TopNAggregation> {
+ public abstract String sourceMeasureName();
+
+ public abstract String fieldName();
+
+ @Nullable
+ public abstract AbstractQuery.Sort fieldValueSort();
+
+ @Nullable
+ public abstract ImmutableList<String> groupByTagNames();
+
+ @Nullable
+ abstract AbstractCriteria criteria();
+
+ abstract int countersNumber();
+
+ abstract int lruSize();
+
+ abstract TopNAggregation.Builder toBuilder();
+
+ public static TopNAggregation.Builder create(String group, String name) {
+ return new AutoValue_TopNAggregation.Builder().setGroup(group).setName(name);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ abstract TopNAggregation.Builder setGroup(String group);
+
+ abstract TopNAggregation.Builder setName(String name);
+
+ abstract TopNAggregation.Builder setUpdatedAt(ZonedDateTime updatedAt);
+
+ public abstract TopNAggregation.Builder setLruSize(int lruSize);
+
+ public abstract TopNAggregation.Builder setCriteria(AbstractCriteria criteria);
+
+ public abstract TopNAggregation.Builder setFieldValueSort(AbstractQuery.Sort sort);
+
+ public abstract TopNAggregation.Builder setFieldName(String fieldName);
+
+ public abstract TopNAggregation.Builder setCountersNumber(int countersNumber);
+
+ public abstract TopNAggregation.Builder setGroupByTagNames(String... groupByTagNames);
+
+ public abstract TopNAggregation.Builder setGroupByTagNames(List<String> groupByTagNames);
+
+ public abstract TopNAggregation.Builder setSourceMeasureName(String sourceMeasureName);
+
+ public abstract TopNAggregation build();
+ }
+
+ @Override
+ public BanyandbDatabase.TopNAggregation serialize() {
+ BanyandbDatabase.TopNAggregation.Builder bld = BanyandbDatabase.TopNAggregation.newBuilder()
+ .setMetadata(buildMetadata());
+
+ bld.setFieldName(this.fieldName())
+ .setSourceMeasure(BanyandbCommon.Metadata.newBuilder().setGroup(group()).setName(this.sourceMeasureName()).build())
+ .setCountersNumber(this.countersNumber())
+ .setLruSize(this.lruSize());
+
+ if (this.criteria() != null) {
+ bld.setCriteria(this.criteria().build());
+ }
+
+ if (this.groupByTagNames() != null) {
+ bld.addAllGroupByTagNames(this.groupByTagNames());
+ } else {
+ bld.addAllGroupByTagNames(Collections.emptyList());
+ }
+
+ if (this.fieldValueSort() == null || this.fieldValueSort() == AbstractQuery.Sort.UNSPECIFIED) {
+ bld.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED);
+ } else {
+ bld.setFieldValueSort(AbstractQuery.Sort.DESC == this.fieldValueSort() ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
+ }
+
+ if (this.updatedAt() != null) {
+ bld.setUpdatedAt(TimeUtils.buildTimestamp(updatedAt()));
+ }
+ return bld.build();
+ }
+
+ public static TopNAggregation fromProtobuf(final BanyandbDatabase.TopNAggregation pb) {
+ TopNAggregation.Builder bld = TopNAggregation.create(pb.getMetadata().getGroup(), pb.getMetadata().getName())
+ .setUpdatedAt(TimeUtils.parseTimestamp(pb.getUpdatedAt()))
+ .setCountersNumber(pb.getCountersNumber())
+ .setLruSize(pb.getLruSize())
+ .setFieldName(pb.getFieldName())
+ .setSourceMeasureName(pb.getSourceMeasure().getName())
+ .setGroupByTagNames(pb.getGroupByTagNamesList());
+
+ switch (pb.getFieldValueSort()) {
+ case SORT_ASC:
+ bld.setFieldValueSort(AbstractQuery.Sort.ASC);
+ break;
+ case SORT_DESC:
+ bld.setFieldValueSort(AbstractQuery.Sort.DESC);
+ break;
+ default:
+ bld.setFieldValueSort(AbstractQuery.Sort.UNSPECIFIED);
+ }
+
+ // TODO: deserialize Criteria
+
+ return bld.build();
+ }
+}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
new file mode 100644
index 0000000..d10e5d7
--- /dev/null
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistry.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import io.grpc.Channel;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.v1.client.grpc.MetadataClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TopNAggregationMetadataRegistry extends MetadataClient<TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceBlockingStub,
+ BanyandbDatabase.TopNAggregation, TopNAggregation> {
+ public TopNAggregationMetadataRegistry(Channel channel) {
+ super(TopNAggregationRegistryServiceGrpc.newBlockingStub(channel));
+ }
+
+ @Override
+ public void create(TopNAggregation payload) throws BanyanDBException {
+ execute(() ->
+ stub.create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest.newBuilder()
+ .setTopNAggregation(payload.serialize())
+ .build()));
+ }
+
+ @Override
+ public void update(TopNAggregation payload) throws BanyanDBException {
+ execute(() ->
+ stub.update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest.newBuilder()
+ .setTopNAggregation(payload.serialize())
+ .build()));
+ }
+
+ @Override
+ public boolean delete(String group, String name) throws BanyanDBException {
+ BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse resp = execute(() ->
+ stub.delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest.newBuilder()
+ .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+ .build()));
+ return resp != null && resp.getDeleted();
+ }
+
+ @Override
+ public TopNAggregation get(String group, String name) throws BanyanDBException {
+ BanyandbDatabase.TopNAggregationRegistryServiceGetResponse resp = execute(() ->
+ stub.get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest.newBuilder()
+ .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+ .build()));
+
+ return TopNAggregation.fromProtobuf(resp.getTopNAggregation());
+ }
+
+ @Override
+ public ResourceExist exist(String group, String name) throws BanyanDBException {
+ BanyandbDatabase.TopNAggregationRegistryServiceExistResponse resp = execute(() ->
+ stub.exist(BanyandbDatabase.TopNAggregationRegistryServiceExistRequest.newBuilder()
+ .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+ .build()));
+ return ResourceExist.create(resp.getHasGroup(), resp.getHasTopNAggregation());
+ }
+
+ @Override
+ public List<TopNAggregation> list(String group) throws BanyanDBException {
+ BanyandbDatabase.TopNAggregationRegistryServiceListResponse resp = execute(() ->
+ stub.list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest.newBuilder()
+ .setGroup(group)
+ .build()));
+
+ return resp.getTopNAggregationList().stream().map(TopNAggregation::fromProtobuf).collect(Collectors.toList());
+ }
+}
diff --git a/src/main/proto/banyandb/v1/banyandb-measure.proto b/src/main/proto/banyandb/v1/banyandb-measure.proto
index 7c41eb4..1d89baa 100644
--- a/src/main/proto/banyandb/v1/banyandb-measure.proto
+++ b/src/main/proto/banyandb/v1/banyandb-measure.proto
@@ -100,12 +100,12 @@ message QueryRequest {
model.v1.QueryOrder order_by = 12;
}
-//TopNList contains a series of topN items
+// TopNList contains a series of topN items
message TopNList {
// timestamp is in the timeunit of milliseconds.
google.protobuf.Timestamp timestamp = 1;
message Item {
- string name = 1;
+ repeated model.v1.Tag entity = 1;
model.v1.FieldValue value = 2;
}
// items contains top-n items in a list
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
index 44b467f..a53e97e 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
@@ -32,6 +32,7 @@ import org.apache.skywalking.banyandb.database.v1.IndexRuleBindingRegistryServic
import org.apache.skywalking.banyandb.database.v1.IndexRuleRegistryServiceGrpc;
import org.apache.skywalking.banyandb.database.v1.MeasureRegistryServiceGrpc;
import org.apache.skywalking.banyandb.database.v1.StreamRegistryServiceGrpc;
+import org.apache.skywalking.banyandb.database.v1.TopNAggregationRegistryServiceGrpc;
import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
import org.junit.Rule;
@@ -253,6 +254,58 @@ public class AbstractBanyanDBClientTest {
}
}));
+ // measure registry
+ protected Map<String, BanyandbDatabase.TopNAggregation> topNAggregationRegistry;
+
+ private final TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase topNAggregationRegistryServiceImpl =
+ mock(TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase.class, delegatesTo(
+ new TopNAggregationRegistryServiceGrpc.TopNAggregationRegistryServiceImplBase() {
+ @Override
+ public void create(BanyandbDatabase.TopNAggregationRegistryServiceCreateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse> responseObserver) {
+ BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+ .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+ .build();
+ topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+ responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceCreateResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void update(BanyandbDatabase.TopNAggregationRegistryServiceUpdateRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse> responseObserver) {
+ BanyandbDatabase.TopNAggregation aggr = request.getTopNAggregation().toBuilder()
+ .setUpdatedAt(TimeUtils.buildTimestamp(ZonedDateTime.now()))
+ .build();
+ topNAggregationRegistry.put(aggr.getMetadata().getName(), aggr);
+ responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceUpdateResponse.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void delete(BanyandbDatabase.TopNAggregationRegistryServiceDeleteRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse> responseObserver) {
+ BanyandbDatabase.TopNAggregation oldMeasure = topNAggregationRegistry.remove(request.getMetadata().getName());
+ responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceDeleteResponse.newBuilder()
+ .setDeleted(oldMeasure != null)
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void get(BanyandbDatabase.TopNAggregationRegistryServiceGetRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceGetResponse> responseObserver) {
+ responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceGetResponse.newBuilder()
+ .setTopNAggregation(topNAggregationRegistry.get(request.getMetadata().getName()))
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void list(BanyandbDatabase.TopNAggregationRegistryServiceListRequest request, StreamObserver<BanyandbDatabase.TopNAggregationRegistryServiceListResponse> responseObserver) {
+ responseObserver.onNext(BanyandbDatabase.TopNAggregationRegistryServiceListResponse.newBuilder()
+ .addAllTopNAggregation(topNAggregationRegistry.values())
+ .build());
+ responseObserver.onCompleted();
+ }
+ }));
+
protected final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
protected BanyanDBClient client;
@@ -309,4 +362,11 @@ public class AbstractBanyanDBClientTest {
serviceRegistry.addService(measureRegistryServiceImpl);
};
}
+
+ protected SetupFunction bindTopNAggregationRegistry() {
+ return b -> {
+ AbstractBanyanDBClientTest.this.topNAggregationRegistry = new HashMap<>();
+ serviceRegistry.addService(topNAggregationRegistryServiceImpl);
+ };
+ }
}
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
new file mode 100644
index 0000000..f238ffc
--- /dev/null
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/metadata/TopNAggregationMetadataRegistryTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.banyandb.v1.client.metadata;
+
+import org.apache.skywalking.banyandb.v1.client.AbstractBanyanDBClientTest;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TopNAggregationMetadataRegistryTest extends AbstractBanyanDBClientTest {
+ @Before
+ public void setUp() throws IOException {
+ this.setUp(bindTopNAggregationRegistry());
+ }
+
+ @Test
+ public void testTopNAggregationRegistry_createAndGet() throws BanyanDBException {
+ TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+ .setFieldValueSort(AbstractQuery.Sort.DESC)
+ .setFieldName("value")
+ .setSourceMeasureName("service_cpm_minute")
+ .setLruSize(10)
+ .setCountersNumber(1000)
+ .setGroupByTagNames("service_id")
+ .build();
+ this.client.define(expectedTopNAggregation);
+ Assert.assertTrue(topNAggregationRegistry.containsKey("service_cpm_minute_topn"));
+ TopNAggregation actualTopNAggregation = client.findTopNAggregation("sw_metric", "service_cpm_minute_topn");
+ Assert.assertNotNull(actualTopNAggregation);
+ Assert.assertEquals(expectedTopNAggregation, actualTopNAggregation);
+ Assert.assertNotNull(actualTopNAggregation.updatedAt());
+ }
+
+ @Test
+ public void testTopNAggregationRegistry_createAndList() throws BanyanDBException {
+ TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+ .setFieldValueSort(AbstractQuery.Sort.DESC)
+ .setFieldName("value")
+ .setSourceMeasureName("service_cpm_minute")
+ .setLruSize(10)
+ .setCountersNumber(1000)
+ .setGroupByTagNames("service_id")
+ .build();
+ this.client.define(expectedTopNAggregation);
+ List<TopNAggregation> actualTopNAggregations = new TopNAggregationMetadataRegistry(this.channel).list("sw_metric");
+ Assert.assertNotNull(actualTopNAggregations);
+ Assert.assertEquals(1, actualTopNAggregations.size());
+ }
+
+ @Test
+ public void testTopNAggregationRegistry_createAndDelete() throws BanyanDBException {
+ TopNAggregation expectedTopNAggregation = TopNAggregation.create("sw_metric", "service_cpm_minute_topn")
+ .setFieldValueSort(AbstractQuery.Sort.DESC)
+ .setFieldName("value")
+ .setSourceMeasureName("service_cpm_minute")
+ .setLruSize(10)
+ .setCountersNumber(1000)
+ .setGroupByTagNames("service_id")
+ .build();
+ this.client.define(expectedTopNAggregation);
+ boolean deleted = new TopNAggregationMetadataRegistry(this.channel).delete("sw_metric", "service_cpm_minute_topn");
+ Assert.assertTrue(deleted);
+ Assert.assertEquals(0, topNAggregationRegistry.size());
+ }
+}