You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/04 14:08:57 UTC

[skywalking] branch banyandb updated: Add the ID tag to process_traffic for searching

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch banyandb
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/banyandb by this push:
     new 4c16930af0 Add the ID tag to process_traffic for searching
4c16930af0 is described below

commit 4c16930af0ad9eb1158945b1ec401403e48d9a39
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jan 4 22:07:51 2023 +0800

    Add the ID tag to process_traffic for searching
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../core/analysis/manual/process/ProcessTraffic.java    |  1 +
 .../oap/server/core/storage/annotation/BanyanDB.java    |  9 +++++++++
 .../core/storage/model/BanyanDBModelExtension.java      |  8 ++++++++
 .../oap/server/core/storage/model/StorageModels.java    |  4 ++++
 .../storage/plugin/banyandb/BanyanDBConverter.java      | 11 +++++++++++
 .../storage/plugin/banyandb/MetadataRegistry.java       | 17 +++++++++++++----
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java      |  8 ++++----
 .../plugin/banyandb/measure/BanyanDBMetricsDAO.java     |  6 ++++++
 .../plugin/banyandb/stream/AbstractBanyanDBDAO.java     |  5 -----
 9 files changed, 56 insertions(+), 13 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
index f763fe12c7..f55973201a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
@@ -52,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
     "name",
 })
 @SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.NO_SHARDING)
+@BanyanDB.StoreIDTag
 public class ProcessTraffic extends Metrics {
     public static final String INDEX_NAME = "process_traffic";
     public static final String SERVICE_ID = "service_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index b2878ba665..92ad4595b9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -140,4 +140,13 @@ public @interface BanyanDB {
     @Retention(RetentionPolicy.RUNTIME)
     @interface MeasureField {
     }
+
+    /**
+     * StoreIDTag indicates a metric store its ID as a tag for searching.
+     * @Since 9.4.0
+     */
+    @Target({ElementType.TYPE})
+    @Retention(RetentionPolicy.RUNTIME)
+    @interface StoreIDTag {
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
index dc8b60ad9c..bf9887d606 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -37,4 +37,12 @@ public class BanyanDBModelExtension {
     @Setter
     private String timestampColumn;
 
+    /**
+     * shouldStoreIDTag indicates whether a metric store its ID as a tag.
+     * The installer will create a virtual string ID tag with a tree index rule.
+     */
+    @Getter
+    @Setter
+    private boolean shouldStoreIDTag;
+
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2d600b422c..e231fbbb10 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -93,6 +93,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
             banyanDBModelExtension.setTimestampColumn(timestampColumn);
         }
 
+        if (aClass.isAnnotationPresent(BanyanDB.StoreIDTag.class)) {
+            banyanDBModelExtension.setShouldStoreIDTag(true);
+        }
+
         checker.check(storage.getModelName());
 
         Model model = new Model(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 55c519237c..270fca5135 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -40,6 +40,9 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.util.ByteUtil;
 import java.util.List;
 
 public class BanyanDBConverter {
+
+    public static final String ID = "id";
+
     public static class StorageToStream implements Convert2Entity {
         private final MetadataRegistry.Schema schema;
         private final RowEntity rowEntity;
@@ -154,6 +157,14 @@ public class BanyanDBConverter {
             }
         }
 
+        public void acceptID(String id) {
+            try {
+                this.measureWrite.tag(ID, TagAndValue.stringTagValue(id));
+            } catch (BanyanDBException ex) {
+                log.error("fail to add ID tag", ex);
+            }
+        }
+
         @Override
         public void accept(String fieldName, byte[] fieldValue) {
             MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 00f1d7d995..11addacc60 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -94,7 +94,7 @@ public enum MetadataRegistry {
         // 1) a list of TagFamilySpec,
         // 2) a list of IndexRule,
         List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder);
-        List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags);
+        List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags, false);
         // iterate over tagFamilySpecs to save tag names
         for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
             for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
@@ -138,7 +138,7 @@ public enum MetadataRegistry {
         // 1) a list of TagFamilySpec,
         // 2) a list of IndexRule,
         MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, schemaBuilder);
-        List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags);
+        List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, model.getBanyanDBModelExtension().isShouldStoreIDTag());
         // iterate over tagFamilySpecs to save tag names
         for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
             for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
@@ -150,6 +150,10 @@ public enum MetadataRegistry {
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
 
+        if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+            indexRules.add(IndexRule.create(BanyanDBConverter.ID, IndexRule.IndexType.TREE, IndexRule.IndexLocation.SERIES));
+        }
+
         final Measure.Builder builder = Measure.create(schemaMetadata.getGroup(), schemaMetadata.name(),
                 downSamplingDuration(model.getDownsampling()));
         builder.setEntityRelativeTags(shardingColumns);
@@ -514,14 +518,19 @@ public enum MetadataRegistry {
             }
         }
 
-        private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) {
+        private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
+            final String indexFamily = SchemaMetadata.this.indexFamily();
+            final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
             Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream()
-                    .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? SchemaMetadata.this.indexFamily() : SchemaMetadata.this.nonIndexFamily()));
+                    .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? indexFamily : nonIndexFamily));
 
             final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size());
             for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) {
                 final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey())
                         .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+                if (shouldAddID && indexFamily.equals(entry.getKey())) {
+                    b.addTagSpec(TagFamilySpec.TagSpec.newStringTag(BanyanDBConverter.ID));
+                }
                 tagFamilySpecs.add(b.build());
             }
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 5d5c18fc55..3231a506fc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -165,15 +165,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
 
     @Override
     public ServiceInstance getInstance(String instanceId) throws IOException {
+        IDManager.ServiceInstanceID.InstanceIDDefinition id = IDManager.ServiceInstanceID.analysisId(instanceId);
         MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
                 INSTANCE_TRAFFIC_COMPACT_TAGS,
                 Collections.emptySet(),
                 new QueryBuilder<MeasureQuery>() {
                     @Override
                     protected void apply(MeasureQuery query) {
-                        if (StringUtil.isNotEmpty(instanceId)) {
-                            query.and(id(instanceId));
-                        }
+                            query.and(eq(InstanceTraffic.SERVICE_ID, id.getServiceId()))
+                                    .and(eq(InstanceTraffic.NAME, id.getName()));
                     }
                 });
         MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
@@ -327,7 +327,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
                     @Override
                     protected void apply(MeasureQuery query) {
                         if (StringUtil.isNotEmpty(processId)) {
-                            query.and(id(processId));
+                            query.and(eq(BanyanDBConverter.ID, processId));
                         }
                     }
                 });
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 53acda1118..3b9e6fc998 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -138,6 +138,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
         final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
+        if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+            toStorage.acceptID(metrics.id().build());
+        }
         return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
     }
 
@@ -153,6 +156,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
         final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
+        if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+            toStorage.acceptID(metrics.id().build());
+        }
         return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
     }
 
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 62f3facf0c..fd53ab114d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -29,7 +29,6 @@ import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
@@ -146,10 +145,6 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
             return PairQueryCondition.LongQueryCondition.ne(name, value);
         }
 
-        protected PairQueryCondition<String> id(String value) {
-            return PairQueryCondition.IDQueryCondition.eq(Metrics.ID, value);
-        }
-
         protected AbstractQuery.OrderBy desc(String name) {
             return new AbstractQuery.OrderBy(name, AbstractQuery.Sort.DESC);
         }