You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:42 UTC
[skywalking] 21/22: make code compiled
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 6aac11e47cc66974b228b39da5f5739204f0c6a9
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Apr 21 21:23:55 2022 +0800
make code compiled
---
.../storage-banyandb-plugin/pom.xml | 2 +-
.../storage/plugin/banyandb/BanyanDBConverter.java | 55 ++--
.../plugin/banyandb/BanyanDBIndexInstaller.java | 47 +--
.../plugin/banyandb/BanyanDBMetricsDAO.java | 1 -
.../plugin/banyandb/BanyanDBStorageClient.java | 30 +-
.../plugin/banyandb/BanyanDBStorageProvider.java | 21 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 319 +++++++++++++--------
.../storage/plugin/banyandb/StreamMetadata.java | 71 -----
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 7 +-
.../measure/BanyanDBProfileTaskQueryDAO.java | 2 +-
...kQueryDAO.java => BanyanDBServiceLabelDAO.java} | 12 +-
.../banyandb/stream/AbstractBanyanDBDAO.java | 30 +-
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 13 +-
.../plugin/banyandb/stream/BanyanDBBatchDAO.java | 1 -
.../stream/BanyanDBBrowserLogQueryDAO.java | 16 +-
.../BanyanDBEBPFProfilingDataDAO.java} | 15 +-
.../BanyanDBEBPFProfilingScheduleQueryDAO.java} | 15 +-
.../BanyanDBEBPFProfilingTaskDAO.java} | 16 +-
.../banyandb/stream/BanyanDBLogQueryDAO.java | 32 +--
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 28 +-
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 70 +++--
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 9 +-
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 27 +-
23 files changed, 396 insertions(+), 443 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
index e2c9ae7817..7546099e4b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>server-storage-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
- <version>9.0.0-SNAPSHOT</version>
+ <version>9.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 565245ad55..41237f1f3b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -19,11 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
@@ -35,16 +37,11 @@ import java.util.function.Function;
public class BanyanDBConverter {
@RequiredArgsConstructor
public static class StreamToEntity implements Convert2Entity {
- private final StreamMetadata metadata;
private final RowEntity rowEntity;
@Override
public Object get(String fieldName) {
- final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
- if (metadata == null) {
- return null;
- }
- return rowEntity.getValue(metadata.getTagFamilyName(), metadata.getTagSpec().getTagName());
+ return rowEntity.getTagValue(fieldName);
}
@Override
@@ -53,9 +50,9 @@ public class BanyanDBConverter {
}
}
+ @Slf4j
@RequiredArgsConstructor
public static class StreamToStorage implements Convert2Storage<StreamWrite> {
- private final StreamMetadata metadata;
private final StreamWrite streamWrite;
@Override
@@ -64,41 +61,28 @@ public class BanyanDBConverter {
if (Record.TIME_BUCKET.equals(fieldName)) {
return;
}
- final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
- if (metadata == null) {
- return;
- }
- switch (metadata.getTagFamilyName()) {
- case StreamMetadata.TAG_FAMILY_DATA:
- this.streamWrite.dataTag(metadata.getTagIndex(), buildTag(fieldValue));
- break;
- case StreamMetadata.TAG_FAMILY_SEARCHABLE:
- this.streamWrite.searchableTag(metadata.getTagIndex(), buildTag(fieldValue));
- break;
- default:
- throw new IllegalStateException("tag family is not supported");
+ try {
+ this.streamWrite.tag(fieldName, buildTag(fieldValue));
+ } catch (BanyanDBException ex) {
+ log.error("fail to add tag", ex);
}
}
- private SerializableTag<BanyandbModel.TagValue> buildTag(Object value) {
+ private Serializable<BanyandbModel.TagValue> buildTag(Object value) {
if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) {
- return TagAndValue.longField((long) value);
+ return TagAndValue.longTagValue((long) value);
} else if (String.class.equals(value.getClass())) {
- return TagAndValue.stringField((String) value);
+ return TagAndValue.stringTagValue((String) value);
}
throw new IllegalStateException(value.getClass() + " is not supported");
}
@Override
public void accept(String fieldName, byte[] fieldValue) {
- final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
- if (metadata == null) {
- return;
- }
- if (StreamMetadata.TAG_FAMILY_SEARCHABLE.equals(metadata.getTagFamilyName())) {
- this.streamWrite.searchableTag(metadata.getTagIndex(), TagAndValue.binaryField((fieldValue)));
- } else {
- throw new IllegalStateException("binary tag should not be store in the `data` family");
+ try {
+ this.streamWrite.tag(fieldName, TagAndValue.binaryTagValue(fieldValue));
+ } catch (BanyanDBException ex) {
+ log.error("fail to add tag", ex);
}
}
@@ -120,19 +104,12 @@ public class BanyanDBConverter {
@Override
public Object get(String fieldName) {
- final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
- if (metadata == null) {
- return null;
- }
// TODO: get an unmodifiable view of tag
return null;
}
@Override
public StreamWrite obtain() {
- if (metadata.isUseIdAsEntity()) {
- this.accept(StreamMetadata.ID, this.streamWrite.getElementID());
- }
return this.streamWrite;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index f4483b0a14..d894c9990a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -19,9 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -43,25 +44,39 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
@Override
protected boolean isExists(Model model) throws StorageException {
- // TODO: get from BanyanDB and make a diff?
- return false;
+ final MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(model);
+ try {
+ final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client;
+ // first check group
+ Group g = metadata.getOrCreateGroup(c);
+ if (g == null) {
+ throw new StorageException("fail to create group " + metadata.getGroup());
+ }
+ log.info("group {} created", g.name());
+ // then check entity schema
+ return metadata.findRemoteSchema(c).isPresent();
+ } catch (BanyanDBException ex) {
+ throw new StorageException("fail to check existence", ex);
+ }
}
@Override
protected void createTable(Model model) throws StorageException {
- if (model.isTimeSeries() && model.isRecord()) { // stream
- StreamMetadata metaInfo = MetadataRegistry.INSTANCE.registerModel(model, this.configService);
- if (metaInfo != null) {
- log.info("install index {}", model.getName());
- ((BanyanDBStorageClient) client).define(
- new Group(metaInfo.getGroup(), Catalog.STREAM, 2, 10, Duration.ofDays(7))
- );
- ((BanyanDBStorageClient) client).define(metaInfo);
+ try {
+ if (model.isTimeSeries() && model.isRecord()) { // stream
+ Stream stream = (Stream) MetadataRegistry.INSTANCE.registerModel(model, this.configService);
+ if (stream != null) {
+ log.info("install stream schema {}", model.getName());
+ ((BanyanDBStorageClient) client).define(stream);
+ }
+ } else if (model.isTimeSeries() && !model.isRecord()) { // measure
+ // TODO: dynamically register Measure
+ log.info("skip measure index {}", model.getName());
+ } else if (!model.isTimeSeries()) { // UITemplate
+ log.info("skip property index {}", model.getName());
}
- } else if (model.isTimeSeries() && !model.isRecord()) { // measure
- log.info("skip measure index {}", model.getName());
- } else if (!model.isTimeSeries()) { // UITemplate
- log.info("skip property index {}", model.getName());
+ } catch (BanyanDBException ex) {
+ throw new StorageException("fail to install schema", ex);
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
index af54dd350f..4e84979498 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
@@ -2,7 +2,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index b1ca206110..dd92c6fd08 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -23,8 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
@@ -32,25 +31,17 @@ import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckab
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
* which implement {@link Client} and {@link HealthCheckable}.
*/
public class BanyanDBStorageClient implements Client, HealthCheckable {
- private final BanyanDBClient client;
- private final Map<String, Group> groupMap;
+ final BanyanDBClient client;
private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
public BanyanDBStorageClient(String host, int port) {
this.client = new BanyanDBClient(host, port);
- this.groupMap = new ConcurrentHashMap<>();
- }
-
- public Group define(Group group) {
- return groupMap.computeIfAbsent(group.getName(), s -> client.define(group));
}
@Override
@@ -63,22 +54,19 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
this.client.close();
}
- public StreamQueryResponse query(StreamQuery streamQuery) {
+ public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException {
try {
- StreamQueryResponse response = this.client.queryStreams(streamQuery);
+ StreamQueryResponse response = this.client.query(streamQuery);
this.healthChecker.health();
return response;
- } catch (Throwable t) {
- healthChecker.unHealth(t);
- throw t;
+ } catch (BanyanDBException ex) {
+ healthChecker.unHealth(ex);
+ throw ex;
}
}
- public void define(StreamMetadata streamMetadata) {
- Stream stream = this.client.define(streamMetadata.getStream());
- if (stream != null) {
- this.client.defineIndexRules(stream, streamMetadata.getIndexRules().toArray(new IndexRule[]{}));
- }
+ public void define(Stream stream) throws BanyanDBException {
+ this.client.define(stream);
}
public void write(StreamWrite streamWrite) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 922bf2296a..88a81d3372 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -28,9 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
@@ -50,11 +54,15 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBUITemplateManagementDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBHistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO;
@@ -106,13 +114,16 @@ public class BanyanDBStorageProvider extends ModuleProvider {
this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO());
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
- this.registerServiceImplementation(
- IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
+ this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO());
this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO());
this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO());
+ this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new BanyanDBEBPFProfilingTaskDAO());
+ this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new BanyanDBEBPFProfilingDataDAO());
+ this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new BanyanDBEBPFProfilingScheduleQueryDAO());
// TODO: metrics
+ this.registerServiceImplementation(IServiceLabelDAO.class, new BanyanDBServiceLabelDAO());
this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO());
this.registerServiceImplementation(IMetricsQueryDAO.class, new BanyanDBMetricsQueryDAO());
this.registerServiceImplementation(IAggregationQueryDAO.class, new BanyanDBAggregationQueryDAO());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 83af6da867..b8925c7529 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -18,10 +18,17 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-import com.google.common.collect.ImmutableSet;
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import io.grpc.Status;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
@@ -33,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.library.util.StringUtil;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -40,173 +48,230 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
+@Slf4j
public enum MetadataRegistry {
INSTANCE;
- private final Map<String, StreamMetadata> streams = new HashMap<>();
+ private final Map<String, PartialMetadata> registry = new ConcurrentHashMap<>();
- public StreamMetadata registerModel(Model model, ConfigService configService) {
- BanyandbDatabase.Stream pbStream = parseStreamFromModel(model, configService);
+ public NamedSchema<?> registerModel(Model model, ConfigService configService) {
+ PartialMetadata partialMetadata = parseMetadata(model);
+ final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
+ Map<String, ModelColumn> modelColumnMap = model.getColumns().stream()
+ .collect(Collectors.toMap(modelColumn -> modelColumn.getColumnName().getStorageName(), Function.identity()));
+ // parse and set sharding keys
+ builder.setEntityRelativeTags(parseEntityNames(modelColumnMap));
+ // parse and set tag families, which contains tag specs
+ List<TagFamilySpec> specs = parseTagFamilySpecs(model, partialMetadata, configService);
+ builder.addTagFamilies(specs);
+ // parse and add index definition
+ builder.addIndexes(parseIndexRules(specs, partialMetadata.indexFamily(), modelColumnMap));
- final boolean useIdAsEntity = pbStream.getEntity().getTagNamesCount() == 1 &&
- StreamMetadata.ID.equals(pbStream.getEntity().getTagNames(0));
+ registry.put(model.getName(), partialMetadata);
+ return builder.build();
+ }
- final Stream stream = new Stream(pbStream.getMetadata().getGroup(), pbStream.getMetadata().getName());
+ public PartialMetadata findSchema(final String name) {
+ return this.registry.get(name);
+ }
+ List<IndexRule> parseIndexRules(List<TagFamilySpec> specs, String indexTagFamily, Map<String, ModelColumn> modelColumnMap) {
List<IndexRule> indexRules = new ArrayList<>();
- Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
- stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
-
- Map<String, StreamMetadata.TagMetadata> tagDefinition = new HashMap<>();
-
- for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
- final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
- stream.addTagFamilySpec(tagFamilySpec);
-
- int tagIndex = 0;
- for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
- // register tag
- tagDefinition.put(tagSpec.getTagName(), new StreamMetadata.TagMetadata(tagFamilySpec.getTagFamilyName(), tagSpec, tagIndex++));
-
- // if the tag family equals to "searchable", build index rules
- if (tagFamilySpec.getTagFamilyName().equals(StreamMetadata.TAG_FAMILY_SEARCHABLE)) {
- // check if this spec exists in the entity names
- if (entityNameSet.contains(tagSpec.getTagName())) {
- continue;
- }
- BanyandbDatabase.IndexRule pbIndexRule = parseIndexRuleFromTagSpec(pbStream.getMetadata(), tagSpec);
- IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
- indexRules.add(indexRule);
+ for (final TagFamilySpec spec : specs) {
+ if (!indexTagFamily.equals(spec.tagFamilyName())) {
+ continue;
+ }
+ for (final TagFamilySpec.TagSpec tagSpec : spec.tagSpecs()) {
+ final String tagName = tagSpec.getTagName();
+ // TODO: we need to add support index type in the OAP core
+ // Currently, we only register INVERTED type
+ final ModelColumn modelColumn = modelColumnMap.get(tagName);
+ // if it is null, it must be a user-defined tag
+ if (modelColumn == null) {
+ indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES));
+ continue;
+ }
+ if (modelColumn.getBanyanDBExtension().isGlobalIndexing()) {
+ indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.GLOBAL));
+ } else {
+ indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES));
}
}
}
-
- StreamMetadata streamMetadata = StreamMetadata.builder().model(model).stream(stream)
- .tagDefinition(tagDefinition)
- .indexRules(indexRules)
- .group(pbStream.getMetadata().getGroup())
- .useIdAsEntity(useIdAsEntity)
- .build();
- streams.put(model.getName(), streamMetadata);
- return streamMetadata;
- }
-
- public StreamMetadata findStreamMetadata(final String name) {
- return this.streams.get(name);
+ return indexRules;
}
- private BanyandbDatabase.Stream parseStreamFromModel(Model model, ConfigService configService) {
+ /**
+ * Parse sharding keys from the {@link Model}
+ *
+ * @param modelColumnMap the mapping between column storageName and {@link ModelColumn}
+ * @return a list of column names in strict order
+ */
+ List<String> parseEntityNames(Map<String, ModelColumn> modelColumnMap) {
List<ModelColumn> shardingColumns = new ArrayList<>();
+ for (final ModelColumn col : modelColumnMap.values()) {
+ if (col.getBanyanDBExtension().isShardingKey()) {
+ shardingColumns.add(col);
+ }
+ }
+ return shardingColumns.stream()
+ .sorted(Comparator.comparingInt(col -> col.getBanyanDBExtension().getShardingKeyIdx()))
+ .map(col -> col.getColumnName().getName())
+ .collect(Collectors.toList());
+ }
- List<BanyandbDatabase.TagSpec> searchableTagsSpecs = new ArrayList<>();
- List<BanyandbDatabase.TagSpec> dataTagsSpecs = new ArrayList<>();
- for (final ModelColumn modelColumn : model.getColumns()) {
- if (modelColumn.getShardingKeyIdx() > -1) {
- shardingColumns.add(modelColumn);
+ List<TagFamilySpec> parseTagFamilySpecs(Model model, PartialMetadata metadata, ConfigService configService) {
+ Map<String, TagFamilySpec.Builder> builderMap = new HashMap<>();
+ for (final ModelColumn col : model.getColumns()) {
+ final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
+ if (tagSpec == null) {
+ continue;
}
- if (modelColumn.isIndexOnly()) {
- // skip
- } else if (modelColumn.isStorageOnly()) {
- dataTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+ if (col.shouldIndex()) {
+ builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpec(tagSpec);
} else {
- searchableTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+ builderMap.computeIfAbsent(metadata.nonIndexFamily(), TagFamilySpec::create).addTagSpec(tagSpec);
}
}
- Set<String> entities = shardingColumns.stream()
- .sorted(Comparator.comparingInt(ModelColumn::getShardingKeyIdx))
- .map(modelColumn -> modelColumn.getColumnName().getStorageName())
- .collect(Collectors.toSet());
-
- if (entities.isEmpty()) {
- // if sharding keys are not defined, we have to use ID
- entities = Collections.singleton(StreamMetadata.ID);
- // append ID
- searchableTagsSpecs.add(BanyandbDatabase.TagSpec.newBuilder()
- .setName(StreamMetadata.ID)
- .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build());
- }
-
// add all user-defined indexed tags to the end of the "searchable" family
if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
- searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableTracesTags()));
+ builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableTracesTags()));
} else if (LogRecord.INDEX_NAME.equals(model.getName())) {
- searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableLogsTags()));
+ builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableLogsTags()));
} else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
- searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableAlarmTags()));
+ builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableAlarmTags()));
}
- String group = "default-stream";
- if (model.isSuperDataset()) {
- // for superDataset, we should use separate group
- group = model.getName() + "-stream";
- }
+ return builderMap.values().stream().map(TagFamilySpec.Builder::build).collect(Collectors.toList());
+ }
- return BanyandbDatabase.Stream.newBuilder()
- .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
- .setName(StreamMetadata.TAG_FAMILY_DATA)
- .addAllTags(dataTagsSpecs)
- .build())
- .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
- .setName(StreamMetadata.TAG_FAMILY_SEARCHABLE)
- .addAllTags(searchableTagsSpecs)
- .build())
- .setEntity(BanyandbDatabase.Entity.newBuilder()
- .addAllTagNames(entities)
- .build())
- .setMetadata(BanyandbCommon.Metadata.newBuilder()
- .setGroup(group)
- .setName(model.getName())
- .build())
- .build();
+ /**
+ * Extract extra tags from Configuration.
+ * They are for tags defined for {@link SegmentRecord}, {@link LogRecord} and {@link AlarmRecord}.
+ *
+ * @param tags a series of tags joint by comma
+ * @return a list of {@link org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec.TagSpec} generated from input
+ */
+ private List<TagFamilySpec.TagSpec> parseExtraTagSpecs(String tags) {
+ if (StringUtil.isEmpty(tags)) {
+ return Collections.emptyList();
+ }
+ String[] tagsArray = tags.split(",");
+ if (tagsArray.length == 0) {
+ return Collections.emptyList();
+ }
+ return Arrays.stream(tagsArray)
+ .map(TagFamilySpec.TagSpec::newStringTag)
+ .collect(Collectors.toList());
}
- private BanyandbDatabase.TagSpec parseTagSpecFromModelColumn(ModelColumn modelColumn) {
+ /**
+ * Parse TagSpec from {@link ModelColumn}
+ *
+ * @param modelColumn the column in the model to be parsed
+ * @return a typed tag spec
+ */
+ @Nullable
+ private TagFamilySpec.TagSpec parseTagSpec(ModelColumn modelColumn) {
final Class<?> clazz = modelColumn.getType();
if (String.class.equals(clazz)) {
- return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
- .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build();
+ return TagFamilySpec.TagSpec.newStringTag(modelColumn.getColumnName().getStorageName());
} else if (int.class.equals(clazz) || long.class.equals(clazz)) {
- return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
- .setType(BanyandbDatabase.TagType.TAG_TYPE_INT).build();
+ return TagFamilySpec.TagSpec.newIntTag(modelColumn.getColumnName().getStorageName());
} else if (byte[].class.equals(clazz) || DataTable.class.equals(clazz)) {
- return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
- .setType(BanyandbDatabase.TagType.TAG_TYPE_DATA_BINARY).build();
+ return TagFamilySpec.TagSpec.newBinaryTag(modelColumn.getColumnName().getStorageName());
} else {
+ // TODO: we skip all tags with type of List<String>
+ if ("tags".equals(modelColumn.getColumnName().getStorageName())) {
+ return null;
+ }
throw new IllegalStateException("type " + modelColumn.getType().toString() + " is not supported");
}
}
- private List<BanyandbDatabase.TagSpec> parseTagSpecsFromConfiguration(String tags) {
- if (StringUtil.isEmpty(tags)) {
- return Collections.emptyList();
+ public PartialMetadata parseMetadata(Model model) {
+ if (model.isRecord()) {
+ String group = "stream-default";
+ if (model.isSuperDataset()) {
+ // for superDataset, we should use separate group
+ group = "stream-" + model.getName();
+ }
+ return new PartialMetadata(group, model.getName(), Kind.STREAM);
}
- String[] tagsArray = tags.split(",");
- if (tagsArray.length == 0) {
- return Collections.emptyList();
+ return new PartialMetadata("measure-default", model.getName(), Kind.MEASURE);
+ }
+
+ @RequiredArgsConstructor
+ @Data
+ public static class PartialMetadata {
+ private final String group;
+ private final String name;
+ private final Kind kind;
+
+ public Optional<NamedSchema<?>> findRemoteSchema(BanyanDBClient client) throws BanyanDBException {
+ try {
+ switch (kind) {
+ case STREAM:
+ return Optional.ofNullable(client.findStream(this.group, this.name));
+ case MEASURE:
+ return Optional.ofNullable(client.findMeasure(this.group, this.name));
+ default:
+ throw new IllegalStateException("should not reach here");
+ }
+ } catch (BanyanDBException ex) {
+ if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
+ return Optional.empty();
+ }
+
+ throw ex;
+ }
+ }
+
+ public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException {
+ Group g = client.findGroup(this.group);
+ if (g != null) {
+ return g;
+ }
+ switch (kind) {
+ case STREAM:
+ return client.define(Group.create(this.group, Catalog.STREAM, 2, 0, Duration.ofDays(7)));
+ case MEASURE:
+ return client.define(Group.create(this.group, Catalog.MEASURE, 2, 12, Duration.ofDays(7)));
+ default:
+ throw new IllegalStateException("should not reach here");
+ }
+ }
+
+ public String indexFamily() {
+ switch (kind) {
+ case MEASURE:
+ return "default";
+ case STREAM:
+ return "searchable";
+ default:
+ throw new IllegalStateException("should not reach here");
+ }
+ }
+
+ public String nonIndexFamily() {
+ switch (kind) {
+ case MEASURE:
+ return null;
+ case STREAM:
+ return "binary";
+ default:
+ throw new IllegalStateException("should not reach here");
+ }
}
- return Arrays.stream(tagsArray)
- .map(tagName -> BanyandbDatabase.TagSpec.newBuilder().setName(tagName)
- .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build())
- .collect(Collectors.toList());
}
- private BanyandbDatabase.IndexRule parseIndexRuleFromTagSpec(BanyandbCommon.Metadata metadata, TagFamilySpec.TagSpec tagSpec) {
- // In SkyWalking, only "trace_id" should be stored as a global index
- BanyandbDatabase.IndexRule.Location loc = "trace_id".equals(tagSpec.getTagName()) ?
- BanyandbDatabase.IndexRule.Location.LOCATION_GLOBAL :
- BanyandbDatabase.IndexRule.Location.LOCATION_SERIES;
-
- return BanyandbDatabase.IndexRule.newBuilder()
- .setMetadata(BanyandbCommon.Metadata.newBuilder()
- .setName(tagSpec.getTagName()).setGroup(metadata.getGroup()))
- .setLocation(loc)
- .addTags(tagSpec.getTagName())
- // TODO: support TYPE_TREE
- .setType(BanyandbDatabase.IndexRule.Type.TYPE_INVERTED)
- .build();
+ public enum Kind {
+ MEASURE, STREAM;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
deleted file mode 100644
index 814f5f62e2..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.util.List;
-import java.util.Map;
-
-@Getter
-@Builder
-@Slf4j
-public class StreamMetadata {
- public static final String TAG_FAMILY_SEARCHABLE = "searchable";
- public static final String TAG_FAMILY_DATA = "data";
-
- public static final String ID = "id";
-
- private final Model model;
-
- private final Map<String, TagMetadata> tagDefinition;
-
- /**
- * Group of the stream
- */
- private final String group;
- /**
- * Spec of the stream
- */
- private final Stream stream;
- /**
- * Index rules attached to the stream
- */
- private final List<IndexRule> indexRules;
-
- private final int dataFamilySize;
- private final int searchableFamilySize;
-
- private final boolean useIdAsEntity;
-
- @Getter
- @Data
- public static class TagMetadata {
- private final String tagFamilyName;
- private final TagFamilySpec.TagSpec tagSpec;
- private final int tagIndex;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 5833c98c56..bff4fc00be 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -55,10 +55,15 @@ public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
}
@Override
- public List<Process> listProcesses(String serviceId, String instanceId) throws IOException {
+ public List<Process> listProcesses(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
return Collections.emptyList();
}
+ @Override
+ public long getProcessesCount(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+ return 0;
+ }
+
@Override
public Process getProcess(String processId) throws IOException {
return null;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
index 0c6e1ebfd1..690bc6640d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import java.io.IOException;
import java.util.Collections;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
index 0c6e1ebfd1..295af6f75a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java
@@ -18,21 +18,15 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBServiceLabelDAO implements IServiceLabelDAO {
@Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ public List<String> queryAllLabels(String serviceId) throws IOException {
return Collections.emptyList();
}
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- return null;
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 0f093e396b..c40ac35924 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -22,12 +22,14 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import java.io.IOException;
import java.time.Instant;
-import java.util.List;
+import java.util.Set;
public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
private static final Instant UPPER_BOUND = Instant.ofEpochSecond(0, Long.MAX_VALUE);
@@ -38,41 +40,45 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
super(client);
}
- protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, QueryBuilder builder) {
+ protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, QueryBuilder builder) throws IOException {
return this.query(metadata, searchableTags, null, builder);
}
- protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, TimestampRange timestampRange,
- QueryBuilder builder) {
+ protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, TimestampRange timestampRange,
+ QueryBuilder builder) throws IOException {
final StreamQuery query;
if (timestampRange == null) {
- query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), LARGEST_TIME_RANGE, searchableTags);
+ query = new StreamQuery(metadata.getGroup(), metadata.getName(), LARGEST_TIME_RANGE, searchableTags);
} else {
- query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), timestampRange, searchableTags);
+ query = new StreamQuery(metadata.getGroup(), metadata.getName(), timestampRange, searchableTags);
}
builder.apply(query);
- return getClient().query(query);
+ try {
+ return getClient().query(query);
+ } catch (BanyanDBException ex) {
+ throw new IOException(ex);
+ }
}
protected abstract static class QueryBuilder {
abstract void apply(final StreamQuery query);
protected PairQueryCondition<Long> eq(String name, long value) {
- return PairQueryCondition.LongQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.eq(name, value);
}
protected PairQueryCondition<Long> lte(String name, long value) {
- return PairQueryCondition.LongQueryCondition.le(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.le(name, value);
}
protected PairQueryCondition<Long> gte(String name, long value) {
- return PairQueryCondition.LongQueryCondition.ge(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.ge(name, value);
}
protected PairQueryCondition<String> eq(String name, String value) {
- return PairQueryCondition.StringQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.StringQueryCondition.eq(name, value);
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 0b7693d16c..b2a43abff7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -34,7 +34,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.List;
@@ -45,8 +44,8 @@ import java.util.Objects;
* which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
*/
public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
- private final StreamMetadata alarmRecordMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(AlarmRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata alarmRecordMetadata =
+ MetadataRegistry.INSTANCE.findSchema(AlarmRecord.INDEX_NAME);
public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
super(client);
@@ -60,13 +59,11 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
}
StreamQueryResponse resp = query(alarmRecordMetadata,
- ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+ ImmutableSet.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME, AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA),
tsRange,
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
-
if (Objects.nonNull(scopeId)) {
query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId));
}
@@ -90,7 +87,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
for (final RowEntity rowEntity : resp.getElements()) {
AlarmRecord.Builder builder = new AlarmRecord.Builder();
AlarmRecord alarmRecord = builder.storage2Entity(
- new BanyanDBConverter.StreamToEntity(this.alarmRecordMetadata, rowEntity)
+ new BanyanDBConverter.StreamToEntity(rowEntity)
);
AlarmMessage message = new AlarmMessage();
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
index 040b031828..e17b8b66ac 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
@@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture;
public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
private StreamBulkWriteProcessor bulkProcessor;
-
private final int maxBulkSize;
private final int flushInterval;
private final int concurrency;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index d384210cd9..2c412cf7dc 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
@@ -34,18 +34,16 @@ import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
-import java.util.Collections;
import java.util.Objects;
/**
* {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
*/
public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
- private final StreamMetadata browserErrorLogRecordMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(BrowserErrorLogRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata browserErrorLogRecordMetadata =
+ MetadataRegistry.INSTANCE.findSchema(BrowserErrorLogRecord.INDEX_NAME);
public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
super(client);
@@ -58,13 +56,12 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
}
- StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+ StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableSet.of(BrowserErrorLogRecord.SERVICE_ID,
BrowserErrorLogRecord.SERVICE_VERSION_ID,
BrowserErrorLogRecord.PAGE_PATH_ID,
- BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
+ BrowserErrorLogRecord.ERROR_CATEGORY, BrowserErrorLogRecord.DATA_BINARY), tsRange, new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId));
if (StringUtil.isNotEmpty(serviceVersionId)) {
@@ -88,8 +85,7 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
logs.setTotal(resp.size());
for (final RowEntity rowEntity : resp.getElements()) {
- final byte[] dataBinary =
- rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, BrowserErrorLogRecord.DATA_BINARY);
+ final byte[] dataBinary = rowEntity.getTagValue(BrowserErrorLogRecord.DATA_BINARY);
if (dataBinary != null && dataBinary.length > 0) {
BrowserErrorLog log = parserDataBinary(dataBinary);
logs.getLogs().add(log);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
index 0c6e1ebfd1..4036fa771c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java
@@ -16,23 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO {
@Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException {
return Collections.emptyList();
}
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- return null;
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
similarity index 67%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
index 0c6e1ebfd1..e6e4d529e4 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java
@@ -16,23 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingScheduleQueryDAO implements IEBPFProfilingScheduleDAO {
@Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ public List<EBPFProfilingSchedule> querySchedules(String taskId, long startTimeBucket, long endTimeBucket) throws IOException {
return Collections.emptyList();
}
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- return null;
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
similarity index 66%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
index 0c6e1ebfd1..b0a59f3251 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java
@@ -16,23 +16,19 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
+import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBEBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO {
@Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException {
return Collections.emptyList();
}
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- return null;
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 5cef08a726..c6097539c9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
@@ -41,7 +41,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.List;
@@ -51,8 +50,8 @@ import java.util.Objects;
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
*/
public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
- private final StreamMetadata logRecordMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(LogRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata logRecordMetadata =
+ MetadataRegistry.INSTANCE.findSchema(LogRecord.INDEX_NAME);
public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
super(client);
@@ -66,8 +65,6 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
final QueryBuilder query = new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
-
if (StringUtil.isNotEmpty(serviceId)) {
query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId));
}
@@ -105,32 +102,31 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
}
StreamQueryResponse resp = query(logRecordMetadata,
- ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+ ImmutableSet.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
- AbstractLogRecord.SPAN_ID), tsRange, query);
+ AbstractLogRecord.SPAN_ID, AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT,
+ AbstractLogRecord.TAGS_RAW_DATA), tsRange, query);
Logs logs = new Logs();
logs.setTotal(resp.size());
for (final RowEntity rowEntity : resp.getElements()) {
Log log = new Log();
- log.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_ID));
+ log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID));
log.setServiceInstanceId(
- rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_INSTANCE_ID));
+ rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID));
log.setEndpointId(
- rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.ENDPOINT_ID));
+ rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID));
if (log.getEndpointId() != null) {
log.setEndpointName(
IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
}
- log.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.TRACE_ID));
- log.setTimestamp(((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
- AbstractLogRecord.TIMESTAMP)).longValue());
+ log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID));
+ log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue());
log.setContentType(ContentType.instanceOf(
- ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA,
- AbstractLogRecord.CONTENT_TYPE)).intValue()));
- log.setContent(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.CONTENT));
- byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, AbstractLogRecord.TAGS_RAW_DATA);
+ ((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue()));
+ log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT));
+ byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA);
if (dataBinary != null && dataBinary.length > 0) {
parserDataBinary(dataBinary, log.getTags());
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 4411957f69..6749118a25 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -18,28 +18,27 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
/**
- * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
+ * {@link ProfileTaskLogRecord} is a stream
*/
public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
- private final StreamMetadata profileTaskLogRecord =
- MetadataRegistry.INSTANCE.findStreamMetadata(ProfileTaskLogRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata profileTaskLogRecord =
+ MetadataRegistry.INSTANCE.findSchema(ProfileTaskLogRecord.INDEX_NAME);
private final int queryMaxSize;
@@ -51,12 +50,11 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
StreamQueryResponse resp = query(profileTaskLogRecord,
- ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID),
+ ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID,
+ ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.OPERATION_TYPE),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(ProfileTaskLogRecord.TASK_ID,
- ProfileTaskLogRecord.OPERATION_TYPE));
query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
}
});
@@ -72,15 +70,13 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
private ProfileTaskLog parseTaskLog(RowEntity data) {
return ProfileTaskLog.builder()
.id(data.getId())
- .taskId(data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.TASK_ID))
+ .taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID))
.instanceId(
- data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.INSTANCE_ID))
+ data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
.operationType(ProfileTaskLogOperationType.parse(
- ((Number) data.getValue(StreamMetadata.TAG_FAMILY_DATA,
- ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+ ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
.operationTime(
- ((Number) data.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
- ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+ ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
.build();
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 4c6ac7ddcd..a0cacd0c39 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -18,20 +18,19 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.ArrayList;
@@ -49,11 +48,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
protected final ProfileThreadSnapshotRecord.Builder builder =
new ProfileThreadSnapshotRecord.Builder();
- private final StreamMetadata profileThreadSnapshotMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(ProfileThreadSnapshotRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata profileThreadSnapshotMetadata =
+ MetadataRegistry.INSTANCE.findSchema(ProfileThreadSnapshotRecord.INDEX_NAME);
- private final StreamMetadata segmentRecordMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata segmentRecordMetadata =
+ MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME);
public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
super(client);
@@ -62,7 +61,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
- ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
new QueryBuilder() {
@Override
@@ -78,14 +77,14 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
final List<String> segmentIds = new LinkedList<>();
for (final RowEntity rowEntity : resp.getElements()) {
- segmentIds.add(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, ProfileThreadSnapshotRecord.SEGMENT_ID));
+ segmentIds.add(rowEntity.getTagValue(ProfileThreadSnapshotRecord.SEGMENT_ID));
}
// TODO: support `IN` or `OR` logic operation in BanyanDB
List<BasicTrace> basicTraces = new ArrayList<>();
for (String segmentID : segmentIds) {
final StreamQueryResponse segmentRecordResp = query(segmentRecordMetadata,
- ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
+ ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
new QueryBuilder() {
@Override
public void apply(StreamQuery traceQuery) {
@@ -97,15 +96,15 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(row.getId());
- basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+ basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+ row.getTagValue(SegmentRecord.ENDPOINT_ID)
).getEndpointName());
- basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(
- ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+ ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue()
));
- basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+ basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID));
basicTraces.add(basicTrace);
}
@@ -134,13 +133,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
- ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+ ProfileThreadSnapshotRecord.STACK_BINARY),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
-
query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence))
.appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence));
@@ -150,7 +148,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
for (final RowEntity rowEntity : resp.getElements()) {
ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
- new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+ new BanyanDBConverter.StreamToEntity(rowEntity));
result.add(record);
}
@@ -160,11 +158,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
StreamQueryResponse resp = query(segmentRecordMetadata,
- ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
+ ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID,
+ SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY,
+ SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
}
});
@@ -175,31 +174,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
final RowEntity rowEntity = resp.getElements().iterator().next();
final SegmentRecord segmentRecord = new SegmentRecord();
- segmentRecord.setSegmentId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SEGMENT_ID));
- segmentRecord.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
- segmentRecord.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SERVICE_ID));
+ segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID));
+ segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID));
+ segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID));
segmentRecord.setStartTime(
- ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)).longValue());
+ ((Number) rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue());
segmentRecord.setLatency(
- ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ ((Number) rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue());
segmentRecord.setIsError(
- ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue());
- byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, SegmentRecord.DATA_BINARY);
+ ((Number) rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue());
+ byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY);
if (dataBinary != null && dataBinary.length > 0) {
segmentRecord.setDataBinary(dataBinary);
}
return segmentRecord;
}
- private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
+ private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) throws IOException {
StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
- ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+ ProfileThreadSnapshotRecord.STACK_BINARY),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
-
query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
.appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
.appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
@@ -209,7 +207,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
for (final RowEntity rowEntity : resp.getElements()) {
ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
- new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+ new BanyanDBConverter.StreamToEntity(rowEntity));
records.add(record);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index a1cf73edcd..3fe0e03ef7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -29,7 +29,6 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
@@ -39,17 +38,15 @@ public class BanyanDBRecordDAO implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- StreamMetadata metadata = MetadataRegistry.INSTANCE.findStreamMetadata(model.getName());
+ MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.findSchema(model.getName());
if (metadata == null) {
throw new IOException(model.getName() + " is not registered");
}
StreamWrite streamWrite = new StreamWrite(metadata.getGroup(), // group name
model.getName(), // index-name
record.id(), // identity
- TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()), // timestamp
- metadata.getDataFamilySize(), // length of the "data" tag family
- metadata.getSearchableFamilySize()); // length of the "searchable" tag family
- Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(metadata, streamWrite);
+ TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp
+ Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(streamWrite);
storageBuilder.entity2Storage(record, convert2Storage);
return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index fc0d458c29..e149b5ab27 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -39,7 +39,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,8 +46,8 @@ import java.util.Collections;
import java.util.List;
public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
- private final StreamMetadata segmentRecordMetadata =
- MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+ private final MetadataRegistry.PartialMetadata segmentRecordMetadata =
+ MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME);
public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
super(client);
@@ -117,7 +116,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
}
StreamQueryResponse resp = query(segmentRecordMetadata,
- ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
+ ImmutableSet.of(SegmentRecord.TRACE_ID, // 0 - trace_id
SegmentRecord.IS_ERROR, // 1 - is_error
SegmentRecord.SERVICE_ID, // 2 - service_id
SegmentRecord.SERVICE_INSTANCE_ID, // 3 - service_instance_id
@@ -133,15 +132,15 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(row.getId());
- basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+ basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+ row.getTagValue(SegmentRecord.ENDPOINT_ID)
).getEndpointName());
- basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(
- ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+ ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue()
));
- basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+ basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
@@ -152,17 +151,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
StreamQueryResponse resp = query(segmentRecordMetadata,
- ImmutableList.of(SegmentRecord.TRACE_ID,
+ ImmutableSet.of(SegmentRecord.TRACE_ID,
SegmentRecord.IS_ERROR,
SegmentRecord.SERVICE_ID,
SegmentRecord.SERVICE_INSTANCE_ID,
SegmentRecord.ENDPOINT_ID,
SegmentRecord.LATENCY,
- SegmentRecord.START_TIME),
+ SegmentRecord.START_TIME,
+ SegmentRecord.DATA_BINARY),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId));
}
});
@@ -171,7 +170,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
for (final RowEntity rowEntity : resp.getElements()) {
SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
- new BanyanDBConverter.StreamToEntity(segmentRecordMetadata, rowEntity));
+ new BanyanDBConverter.StreamToEntity(rowEntity));
segmentRecords.add(segmentRecord);
}