You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:50 UTC
[skywalking] 24/25: add measure id and check sharding keys
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 6b211cefa29c515340b59e11257e49a79cc048ab
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 30 16:55:24 2022 +0800
add measure id and check sharding keys
---
.../storage/plugin/banyandb/BanyanDBConverter.java | 2 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 42 ++++++++++++++++------
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index c512b5653c..c3c936ef84 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -45,7 +45,7 @@ public class BanyanDBConverter {
@Override
public Object get(String fieldName) {
MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
- if (double.class.equals(spec.getModelColumn().getType())) {
+ if (double.class.equals(spec.getColumnClass())) {
return ByteUtil.bytes2Double(rowEntity.getTagValue(fieldName));
} else {
return rowEntity.getTagValue(fieldName);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 6c6fbb7658..3ff4cb2833 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
public enum MetadataRegistry {
INSTANCE;
+ private static final String ID = "id";
private final Map<String, Schema> registry = new ConcurrentHashMap<>();
public NamedSchema<?> registerModel(Model model, ConfigService configService) {
@@ -87,25 +88,33 @@ public enum MetadataRegistry {
.filter(Objects::nonNull)
.collect(Collectors.toList());
-
if (partialMetadata.getKind() == Kind.STREAM) {
final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
+ if (entities.isEmpty()) {
+ log.warn("sharding keys of model[stream.{}] must not be empty", model.getName());
+// throw new IllegalStateException("sharding keys of model[" + model.getName() + "] must not be empty");
+ }
builder.setEntityRelativeTags(entities);
builder.addTagFamilies(tagFamilySpecs);
builder.addIndexes(indexRules);
-
registry.put(model.getName(), schemaBuilder.build());
return builder.build();
} else {
final Measure.Builder builder = Measure.create(partialMetadata.getGroup(), partialMetadata.getName(),
downSamplingDuration(model.getDownsampling()));
- builder.setEntityRelativeTags(entities);
+ if (entities.isEmpty()) { // if shardingKeys is empty, for measure, we can use ID as a single sharding key.
+ builder.setEntityRelativeTags(ID);
+ } else {
+ builder.setEntityRelativeTags(entities);
+ }
builder.addTagFamilies(tagFamilySpecs);
builder.addIndexes(indexRules);
// parse and set field
Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE
.readValueColumnDefinition(model.getName());
valueColumnOpt.ifPresent(valueColumn -> builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()), valueColumn)));
+ // register ID
+ schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG, String.class));
registry.put(model.getName(), schemaBuilder.build());
return builder.build();
@@ -194,14 +203,14 @@ public enum MetadataRegistry {
.readValueColumnDefinition(model.getName());
for (final ModelColumn col : model.getColumns()) {
if (valueColumnOpt.isPresent() && valueColumnOpt.get().getValueCName().equals(col.getColumnName().getStorageName())) {
- builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col));
+ builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col.getType()));
continue;
}
final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
if (tagSpec == null) {
continue;
}
- builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col));
+ builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col.getType()));
if (col.shouldIndex()) {
// build indexRule
IndexRule indexRule = parseIndexRule(tagSpec.getTagName(), col);
@@ -316,11 +325,22 @@ public enum MetadataRegistry {
}
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) {
- return tagMetadataList.stream().collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily()))
- .entrySet().stream()
- .map(entry -> TagFamilySpec.create(entry.getKey())
- .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList())).build())
- .collect(Collectors.toList());
+ Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream()
+ .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily()));
+
+ final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size());
+ for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) {
+ final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey())
+ .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+ if (this.getKind() == Kind.MEASURE && entry.getKey().equals(this.indexFamily())) {
+ // append measure ID, but it should not generate an index in the client side.
+ // BanyanDB will take care of the ID index registration.
+ b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID));
+ }
+ tagFamilySpecs.add(b.build());
+ }
+
+ return tagFamilySpecs;
}
public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException {
@@ -392,7 +412,7 @@ public enum MetadataRegistry {
@Getter
public static class ColumnSpec {
private final ColumnType columnType;
- private final ModelColumn modelColumn;
+ private final Class<?> columnClass;
}
public enum ColumnType {