You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/02/25 14:52:55 UTC
[skywalking] 02/02: support topn query
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 307e13eebdc8724a27b4b609c9ccbf92edf85774
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Feb 25 22:51:10 2023 +0800
support topn query
---
.../banyandb/BanyanDBAggregationQueryDAO.java | 60 +++++++++++++++++-----
.../plugin/banyandb/BanyanDBStorageClient.java | 13 +++++
.../storage/plugin/banyandb/MetadataRegistry.java | 11 +++-
.../banyandb/stream/AbstractBanyanDBDAO.java | 18 +++++++
4 files changed, 86 insertions(+), 16 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
index 396cb47202..4c52bcec7b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
@@ -51,6 +52,43 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration, List<KeyValue> additionalConditions) throws IOException {
final String modelName = condition.getName();
final TimestampRange timestampRange = new TimestampRange(duration.getStartTimestamp(), duration.getEndTimestamp());
+ // fast-path: BanyanDB server-side TopN support
+ MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
+ if (schema == null) {
+ throw new IOException("schema is not registered");
+ }
+
+ MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
+ if (spec == null) {
+ throw new IOException("field spec is not registered");
+ }
+
+ if (schema.hasTopNAggregation()) {
+ TopNQueryResponse resp = null;
+ if (condition.getOrder() == Order.DES) {
+ resp = topN(schema, timestampRange, condition.getTopN());
+ } else {
+ resp = bottomN(schema, timestampRange, condition.getTopN());
+ }
+
+ if (resp.getTopNLists().isEmpty()) {
+ return Collections.emptyList();
+ } else if (resp.getTopNLists().size() > 1) { // since we have done aggregation, i.e. MEAN
+ throw new IOException("invalid TopN response");
+ }
+
+ final List<SelectedRecord> topNList = new ArrayList<>();
+ for (TopNQueryResponse.Item item : resp.getTopNLists().get(0).getItems()) {
+ SelectedRecord record = new SelectedRecord();
+ record.setId(item.getName());
+ record.setValue(extractFieldValueAsString(spec, item.getValue()));
+ topNList.add(record);
+ }
+
+ return topNList;
+ }
+
+ // slow-path: TopN using vanilla Measure query
MeasureQueryResponse resp = query(modelName, TAGS, Collections.singleton(valueColumnName),
timestampRange, new QueryBuilder<MeasureQuery>() {
@Override
@@ -75,16 +113,6 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
return Collections.emptyList();
}
- MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
- if (schema == null) {
- throw new IOException("schema is not registered");
- }
-
- MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
- if (spec == null) {
- throw new IOException("field spec is not registered");
- }
-
final List<SelectedRecord> topNList = new ArrayList<>();
for (DataPoint dataPoint : resp.getDataPoints()) {
SelectedRecord record = new SelectedRecord();
@@ -96,13 +124,17 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
return topNList;
}
- private String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+ private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+ return extractFieldValueAsString(spec, dataPoint.getFieldValue(fieldName));
+ }
+
+ private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, Object fieldValue) throws IOException {
if (double.class.equals(spec.getColumnClass())) {
- return String.valueOf(ByteUtil.bytes2Double(dataPoint.getFieldValue(fieldName)).longValue());
+ return String.valueOf(ByteUtil.bytes2Double((byte[]) fieldValue).longValue());
} else if (String.class.equals(spec.getColumnClass())) {
- return dataPoint.getFieldValue(fieldName);
+ return (String) fieldValue;
} else {
- return String.valueOf(((Number) dataPoint.getFieldValue(fieldName)).longValue());
+ return String.valueOf(((Number) fieldValue).longValue());
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 73b527872d..0b460eead7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -27,6 +27,8 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.Property;
@@ -117,6 +119,17 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
}
}
+ public TopNQueryResponse query(TopNQuery q) throws IOException {
+ try {
+ TopNQueryResponse response = this.client.query(q);
+ this.healthChecker.health();
+ return response;
+ } catch (BanyanDBException ex) {
+ healthChecker.unHealth(ex);
+ throw new IOException("fail to query topn", ex);
+ }
+ }
+
public void define(Property property) throws IOException {
try {
this.client.apply(property);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index ed8bc0b42d..fb01e338a7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -175,18 +175,19 @@ public enum MetadataRegistry {
schemaBuilder.field(field.getName());
}
// parse TopN
- schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields));
+ schemaBuilder.topNSpec(parseTopNSpec(schemaMetadata.name(), tagsAndFields));
registry.put(schemaMetadata.name(), schemaBuilder.build());
return builder.build();
}
- private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) {
+ private TopNSpec parseTopNSpec(final String measureName, final MeasureMetadata tagsAndFields) {
if (CollectionUtils.isEmpty(tagsAndFields.fields)) {
return null;
}
// TODO: how to configure parameters?
return TopNSpec.builder()
+ .name(measureName + "_topn")
.lruSize(5)
.countersNumber(10)
.fieldName(tagsAndFields.fields.get(0).getName())
@@ -670,6 +671,10 @@ public enum MetadataRegistry {
return this.specs.get(columnName);
}
+ public boolean hasTopNAggregation() {
+ return topNSpec != null;
+ }
+
public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
if (this.topNSpec == null) {
if (this.metadata.kind == Kind.MEASURE) {
@@ -690,7 +695,9 @@ public enum MetadataRegistry {
@Builder
@EqualsAndHashCode
+ @Getter
public static class TopNSpec {
+ private final String name;
@Singular
private final List<String> groupByTagNames;
private final String fieldName;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index fd53ab114d..b27eade3cc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -28,6 +28,8 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -75,6 +77,22 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
return this.query(measureModelName, tags, fields, null, builder);
}
+ protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+ final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+ timestampRange,
+ number, AbstractQuery.Sort.DESC);
+ q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+ return getClient().query(q);
+ }
+
+ protected TopNQueryResponse bottomN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+ final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+ timestampRange,
+ number, AbstractQuery.Sort.ASC);
+ q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+ return getClient().query(q);
+ }
+
protected MeasureQueryResponse query(String measureModelName, Set<String> tags, Set<String> fields,
TimestampRange timestampRange, QueryBuilder<MeasureQuery> builder) throws IOException {
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute);