You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/04 14:08:57 UTC
[skywalking] branch banyandb updated: Add the ID tag to process_traffic for searching
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch banyandb
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb by this push:
new 4c16930af0 Add the ID tag to process_traffic for searching
4c16930af0 is described below
commit 4c16930af0ad9eb1158945b1ec401403e48d9a39
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jan 4 22:07:51 2023 +0800
Add the ID tag to process_traffic for searching
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.../core/analysis/manual/process/ProcessTraffic.java | 1 +
.../oap/server/core/storage/annotation/BanyanDB.java | 9 +++++++++
.../core/storage/model/BanyanDBModelExtension.java | 8 ++++++++
.../oap/server/core/storage/model/StorageModels.java | 4 ++++
.../storage/plugin/banyandb/BanyanDBConverter.java | 11 +++++++++++
.../storage/plugin/banyandb/MetadataRegistry.java | 17 +++++++++++++----
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 8 ++++----
.../plugin/banyandb/measure/BanyanDBMetricsDAO.java | 6 ++++++
.../plugin/banyandb/stream/AbstractBanyanDBDAO.java | 5 -----
9 files changed, 56 insertions(+), 13 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
index f763fe12c7..f55973201a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/process/ProcessTraffic.java
@@ -52,6 +52,7 @@ import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PR
"name",
})
@SQLDatabase.Sharding(shardingAlgorithm = ShardingAlgorithm.NO_SHARDING)
+@BanyanDB.StoreIDTag
public class ProcessTraffic extends Metrics {
public static final String INDEX_NAME = "process_traffic";
public static final String SERVICE_ID = "service_id";
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index b2878ba665..92ad4595b9 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -140,4 +140,13 @@ public @interface BanyanDB {
@Retention(RetentionPolicy.RUNTIME)
@interface MeasureField {
}
+
+ /**
+ * StoreIDTag indicates a metric store its ID as a tag for searching.
+ * @Since 9.4.0
+ */
+ @Target({ElementType.TYPE})
+ @Retention(RetentionPolicy.RUNTIME)
+ @interface StoreIDTag {
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
index dc8b60ad9c..bf9887d606 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -37,4 +37,12 @@ public class BanyanDBModelExtension {
@Setter
private String timestampColumn;
+ /**
+ * shouldStoreIDTag indicates whether a metric store its ID as a tag.
+ * The installer will create a virtual string ID tag with a tree index rule.
+ */
+ @Getter
+ @Setter
+ private boolean shouldStoreIDTag;
+
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 2d600b422c..e231fbbb10 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -93,6 +93,10 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula
banyanDBModelExtension.setTimestampColumn(timestampColumn);
}
+ if (aClass.isAnnotationPresent(BanyanDB.StoreIDTag.class)) {
+ banyanDBModelExtension.setShouldStoreIDTag(true);
+ }
+
checker.check(storage.getModelName());
Model model = new Model(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 55c519237c..270fca5135 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -40,6 +40,9 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.util.ByteUtil;
import java.util.List;
public class BanyanDBConverter {
+
+ public static final String ID = "id";
+
public static class StorageToStream implements Convert2Entity {
private final MetadataRegistry.Schema schema;
private final RowEntity rowEntity;
@@ -154,6 +157,14 @@ public class BanyanDBConverter {
}
}
+ public void acceptID(String id) {
+ try {
+ this.measureWrite.tag(ID, TagAndValue.stringTagValue(id));
+ } catch (BanyanDBException ex) {
+ log.error("fail to add ID tag", ex);
+ }
+ }
+
@Override
public void accept(String fieldName, byte[] fieldValue) {
MetadataRegistry.ColumnSpec columnSpec = this.schema.getSpec(fieldName);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 00f1d7d995..11addacc60 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -94,7 +94,7 @@ public enum MetadataRegistry {
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
List<TagMetadata> tags = parseTagMetadata(model, schemaBuilder);
- List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags);
+ List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tags, false);
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
@@ -138,7 +138,7 @@ public enum MetadataRegistry {
// 1) a list of TagFamilySpec,
// 2) a list of IndexRule,
MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, schemaBuilder);
- List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags);
+ List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, model.getBanyanDBModelExtension().isShouldStoreIDTag());
// iterate over tagFamilySpecs to save tag names
for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.tagSpecs()) {
@@ -150,6 +150,10 @@ public enum MetadataRegistry {
.filter(Objects::nonNull)
.collect(Collectors.toList());
+ if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+ indexRules.add(IndexRule.create(BanyanDBConverter.ID, IndexRule.IndexType.TREE, IndexRule.IndexLocation.SERIES));
+ }
+
final Measure.Builder builder = Measure.create(schemaMetadata.getGroup(), schemaMetadata.name(),
downSamplingDuration(model.getDownsampling()));
builder.setEntityRelativeTags(shardingColumns);
@@ -514,14 +518,19 @@ public enum MetadataRegistry {
}
}
- private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) {
+ private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
+ final String indexFamily = SchemaMetadata.this.indexFamily();
+ final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream()
- .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? SchemaMetadata.this.indexFamily() : SchemaMetadata.this.nonIndexFamily()));
+ .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? indexFamily : nonIndexFamily));
final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size());
for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) {
final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey())
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+ if (shouldAddID && indexFamily.equals(entry.getKey())) {
+ b.addTagSpec(TagFamilySpec.TagSpec.newStringTag(BanyanDBConverter.ID));
+ }
tagFamilySpecs.add(b.build());
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 5d5c18fc55..3231a506fc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -165,15 +165,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public ServiceInstance getInstance(String instanceId) throws IOException {
+ IDManager.ServiceInstanceID.InstanceIDDefinition id = IDManager.ServiceInstanceID.analysisId(instanceId);
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
INSTANCE_TRAFFIC_COMPACT_TAGS,
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
- if (StringUtil.isNotEmpty(instanceId)) {
- query.and(id(instanceId));
- }
+ query.and(eq(InstanceTraffic.SERVICE_ID, id.getServiceId()))
+ .and(eq(InstanceTraffic.NAME, id.getName()));
}
});
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute);
@@ -327,7 +327,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(processId)) {
- query.and(id(processId));
+ query.and(eq(BanyanDBConverter.ID, processId));
}
}
});
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 53acda1118..3b9e6fc998 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -138,6 +138,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
+ if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+ toStorage.acceptID(metrics.id().build());
+ }
return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
}
@@ -153,6 +156,9 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
+ if (model.getBanyanDBModelExtension().isShouldStoreIDTag()) {
+ toStorage.acceptID(metrics.id().build());
+ }
return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 62f3facf0c..fd53ab114d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -29,7 +29,6 @@ import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
@@ -146,10 +145,6 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
return PairQueryCondition.LongQueryCondition.ne(name, value);
}
- protected PairQueryCondition<String> id(String value) {
- return PairQueryCondition.IDQueryCondition.eq(Metrics.ID, value);
- }
-
protected AbstractQuery.OrderBy desc(String name) {
return new AbstractQuery.OrderBy(name, AbstractQuery.Sort.DESC);
}