You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/02/25 14:52:55 UTC

[skywalking] 02/02: support topn query

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 307e13eebdc8724a27b4b609c9ccbf92edf85774
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Feb 25 22:51:10 2023 +0800

    support topn query
---
 .../banyandb/BanyanDBAggregationQueryDAO.java      | 60 +++++++++++++++++-----
 .../plugin/banyandb/BanyanDBStorageClient.java     | 13 +++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 11 +++-
 .../banyandb/stream/AbstractBanyanDBDAO.java       | 18 +++++++
 4 files changed, 86 insertions(+), 16 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
index 396cb47202..4c52bcec7b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
@@ -51,6 +52,43 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
     public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueColumnName, Duration duration, List<KeyValue> additionalConditions) throws IOException {
         final String modelName = condition.getName();
         final TimestampRange timestampRange = new TimestampRange(duration.getStartTimestamp(), duration.getEndTimestamp());
+        // fast-path: BanyanDB server-side TopN support
+        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
+        if (schema == null) {
+            throw new IOException("schema is not registered");
+        }
+
+        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
+        if (spec == null) {
+            throw new IOException("field spec is not registered");
+        }
+
+        if (schema.hasTopNAggregation()) {
+            TopNQueryResponse resp = null;
+            if (condition.getOrder() == Order.DES) {
+                resp = topN(schema, timestampRange, condition.getTopN());
+            } else {
+                resp = bottomN(schema, timestampRange, condition.getTopN());
+            }
+
+            if (resp.getTopNLists().isEmpty()) {
+                return Collections.emptyList();
+            } else if (resp.getTopNLists().size() > 1) { // since we have done aggregation, i.e. MEAN
+                throw new IOException("invalid TopN response");
+            }
+
+            final List<SelectedRecord> topNList = new ArrayList<>();
+            for (TopNQueryResponse.Item item : resp.getTopNLists().get(0).getItems()) {
+                SelectedRecord record = new SelectedRecord();
+                record.setId(item.getName());
+                record.setValue(extractFieldValueAsString(spec, item.getValue()));
+                topNList.add(record);
+            }
+
+            return topNList;
+        }
+
+        // slow-path: TopN using vanilla Measure query
         MeasureQueryResponse resp = query(modelName, TAGS, Collections.singleton(valueColumnName),
                 timestampRange, new QueryBuilder<MeasureQuery>() {
                     @Override
@@ -75,16 +113,6 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
             return Collections.emptyList();
         }
 
-        MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
-        if (schema == null) {
-            throw new IOException("schema is not registered");
-        }
-
-        MetadataRegistry.ColumnSpec spec = schema.getSpec(valueColumnName);
-        if (spec == null) {
-            throw new IOException("field spec is not registered");
-        }
-
         final List<SelectedRecord> topNList = new ArrayList<>();
         for (DataPoint dataPoint : resp.getDataPoints()) {
             SelectedRecord record = new SelectedRecord();
@@ -96,13 +124,17 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements
         return topNList;
     }
 
-    private String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+    private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, String fieldName, DataPoint dataPoint) throws IOException {
+        return extractFieldValueAsString(spec, dataPoint.getFieldValue(fieldName));
+    }
+
+    private static String extractFieldValueAsString(MetadataRegistry.ColumnSpec spec, Object fieldValue) throws IOException {
         if (double.class.equals(spec.getColumnClass())) {
-            return String.valueOf(ByteUtil.bytes2Double(dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(ByteUtil.bytes2Double((byte[]) fieldValue).longValue());
         } else if (String.class.equals(spec.getColumnClass())) {
-            return dataPoint.getFieldValue(fieldName);
+            return (String) fieldValue;
         } else {
-            return String.valueOf(((Number) dataPoint.getFieldValue(fieldName)).longValue());
+            return String.valueOf(((Number) fieldValue).longValue());
         }
     }
 }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 73b527872d..0b460eead7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -27,6 +27,8 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
@@ -117,6 +119,17 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public TopNQueryResponse query(TopNQuery q) throws IOException {
+        try {
+            TopNQueryResponse response = this.client.query(q);
+            this.healthChecker.health();
+            return response;
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to query topn", ex);
+        }
+    }
+
     public void define(Property property) throws IOException {
         try {
             this.client.apply(property);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index ed8bc0b42d..fb01e338a7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -175,18 +175,19 @@ public enum MetadataRegistry {
             schemaBuilder.field(field.getName());
         }
         // parse TopN
-        schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields));
+        schemaBuilder.topNSpec(parseTopNSpec(schemaMetadata.name(), tagsAndFields));
 
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return builder.build();
     }
 
-    private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) {
+    private TopNSpec parseTopNSpec(final String measureName, final MeasureMetadata tagsAndFields) {
         if (CollectionUtils.isEmpty(tagsAndFields.fields)) {
             return null;
         }
         // TODO: how to configure parameters?
         return TopNSpec.builder()
+                .name(measureName + "_topn")
                 .lruSize(5)
                 .countersNumber(10)
                 .fieldName(tagsAndFields.fields.get(0).getName())
@@ -670,6 +671,10 @@ public enum MetadataRegistry {
             return this.specs.get(columnName);
         }
 
+        public boolean hasTopNAggregation() {
+            return topNSpec != null;
+        }
+
         public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
             if (this.topNSpec == null) {
                 if (this.metadata.kind == Kind.MEASURE) {
@@ -690,7 +695,9 @@ public enum MetadataRegistry {
 
     @Builder
     @EqualsAndHashCode
+    @Getter
     public static class TopNSpec {
+        private final String name;
         @Singular
         private final List<String> groupByTagNames;
         private final String fieldName;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index fd53ab114d..b27eade3cc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -28,6 +28,8 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.TopNQuery;
+import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -75,6 +77,22 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         return this.query(measureModelName, tags, fields, null, builder);
     }
 
+    protected TopNQueryResponse topN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+        final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+                timestampRange,
+                number, AbstractQuery.Sort.DESC);
+        q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+        return getClient().query(q);
+    }
+
+    protected TopNQueryResponse bottomN(MetadataRegistry.Schema schema, TimestampRange timestampRange, int number) throws IOException {
+        final TopNQuery q = new TopNQuery(schema.getMetadata().getGroup(), schema.getTopNSpec().getName(),
+                timestampRange,
+                number, AbstractQuery.Sort.ASC);
+        q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
+        return getClient().query(q);
+    }
+
     protected MeasureQueryResponse query(String measureModelName, Set<String> tags, Set<String> fields,
                                          TimestampRange timestampRange, QueryBuilder<MeasureQuery> builder) throws IOException {
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(measureModelName, DownSampling.Minute);