You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:43 UTC

[skywalking] 24/24: add measure id and check sharding keys

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit cd13a7372ac6c0a9b74698c07780a6022c453039
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 30 16:55:24 2022 +0800

    add measure id and check sharding keys
---
 .../storage/plugin/banyandb/BanyanDBConverter.java |  2 +-
 .../storage/plugin/banyandb/MetadataRegistry.java  | 42 ++++++++++++++++------
 2 files changed, 32 insertions(+), 12 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index c512b5653c..c3c936ef84 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -45,7 +45,7 @@ public class BanyanDBConverter {
         @Override
         public Object get(String fieldName) {
             MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
-            if (double.class.equals(spec.getModelColumn().getType())) {
+            if (double.class.equals(spec.getColumnClass())) {
                 return ByteUtil.bytes2Double(rowEntity.getTagValue(fieldName));
             } else {
                 return rowEntity.getTagValue(fieldName);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 6c6fbb7658..3ff4cb2833 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
 public enum MetadataRegistry {
     INSTANCE;
 
+    private static final String ID = "id";
     private final Map<String, Schema> registry = new ConcurrentHashMap<>();
 
     public NamedSchema<?> registerModel(Model model, ConfigService configService) {
@@ -87,25 +88,33 @@ public enum MetadataRegistry {
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
 
-
         if (partialMetadata.getKind() == Kind.STREAM) {
             final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
+            if (entities.isEmpty()) {
+                log.warn("sharding keys of model[stream.{}] must not be empty", model.getName());
+//            throw new IllegalStateException("sharding keys of model[" + model.getName() + "] must not be empty");
+            }
             builder.setEntityRelativeTags(entities);
             builder.addTagFamilies(tagFamilySpecs);
             builder.addIndexes(indexRules);
-
             registry.put(model.getName(), schemaBuilder.build());
             return builder.build();
         } else {
             final Measure.Builder builder = Measure.create(partialMetadata.getGroup(), partialMetadata.getName(),
                     downSamplingDuration(model.getDownsampling()));
-            builder.setEntityRelativeTags(entities);
+            if (entities.isEmpty()) { // if shardingKeys is empty, for measure, we can use ID as a single sharding key.
+                builder.setEntityRelativeTags(ID);
+            } else {
+                builder.setEntityRelativeTags(entities);
+            }
             builder.addTagFamilies(tagFamilySpecs);
             builder.addIndexes(indexRules);
             // parse and set field
             Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE
                     .readValueColumnDefinition(model.getName());
             valueColumnOpt.ifPresent(valueColumn -> builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()), valueColumn)));
+            // register ID
+            schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG, String.class));
 
             registry.put(model.getName(), schemaBuilder.build());
             return builder.build();
@@ -194,14 +203,14 @@ public enum MetadataRegistry {
                 .readValueColumnDefinition(model.getName());
         for (final ModelColumn col : model.getColumns()) {
             if (valueColumnOpt.isPresent() && valueColumnOpt.get().getValueCName().equals(col.getColumnName().getStorageName())) {
-                builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col));
+                builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col.getType()));
                 continue;
             }
             final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
             if (tagSpec == null) {
                 continue;
             }
-            builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col));
+            builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col.getType()));
             if (col.shouldIndex()) {
                 // build indexRule
                 IndexRule indexRule = parseIndexRule(tagSpec.getTagName(), col);
@@ -316,11 +325,22 @@ public enum MetadataRegistry {
         }
 
         private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) {
-            return tagMetadataList.stream().collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily()))
-                    .entrySet().stream()
-                    .map(entry -> TagFamilySpec.create(entry.getKey())
-                            .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList())).build())
-                    .collect(Collectors.toList());
+            Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream()
+                    .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily()));
+
+            final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size());
+            for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) {
+                final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey())
+                        .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+                if (this.getKind() == Kind.MEASURE && entry.getKey().equals(this.indexFamily())) {
+                    // append measure ID, but it should not generate an index in the client side.
+                    // BanyanDB will take care of the ID index registration.
+                    b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID));
+                }
+                tagFamilySpecs.add(b.build());
+            }
+
+            return tagFamilySpecs;
         }
 
         public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException {
@@ -392,7 +412,7 @@ public enum MetadataRegistry {
     @Getter
     public static class ColumnSpec {
         private final ColumnType columnType;
-        private final ModelColumn modelColumn;
+        private final Class<?> columnClass;
     }
 
     public enum ColumnType {