You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:41 UTC
[skywalking] 20/22: remove all hardcoded builder
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit aff43b1be297526f609ce289a9d95946ecd3f21c
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Mar 21 23:39:23 2022 +0800
remove all hardcoded builder
---
.../storage/plugin/banyandb/BanyanDBConverter.java | 139 +++++++++++
.../plugin/banyandb/BanyanDBIndexInstaller.java | 28 ++-
.../plugin/banyandb/BanyanDBManagementDAO.java | 20 ++
.../plugin/banyandb/BanyanDBMetricsDAO.java | 36 +++
.../{stream => }/BanyanDBNoneStreamDAO.java | 32 +--
.../banyandb/BanyanDBStorageBuilderFactory.java | 112 ---------
.../plugin/banyandb/BanyanDBStorageClient.java | 24 +-
.../plugin/banyandb/BanyanDBStorageProvider.java | 31 +--
.../storage/plugin/banyandb/MetadataRegistry.java | 212 ++++++++++++++++
.../storage/plugin/banyandb/StreamMetaInfo.java | 140 -----------
.../storage/plugin/banyandb/StreamMetadata.java | 71 ++++++
.../BanyanDBEventQueryDAO.java} | 27 +-
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 66 +++++
.../BanyanDBNetworkAddressAliasDAO.java} | 22 +-
.../BanyanDBProfileTaskQueryDAO.java} | 28 ++-
.../banyandb/measure/BanyanDBTopologyQueryDAO.java | 63 +++++
.../measure/BanyanDBUITemplateManagementDAO.java | 55 ++++
.../plugin/banyandb/schema/AlarmRecordBuilder.java | 55 ----
.../schema/BanyanDBStorageDataBuilder.java | 88 -------
.../schema/BrowserErrorLogRecordBuilder.java | 46 ----
.../EndpointRelationServerSideMetricsBuilder.java | 48 ----
.../plugin/banyandb/schema/EventBuilder.java | 52 ----
.../plugin/banyandb/schema/LogRecordBuilder.java | 58 -----
.../storage/plugin/banyandb/schema/Metadata.java | 100 --------
.../schema/NetworkAddressAliasBuilder.java | 49 ----
.../schema/ProfileTaskLogRecordBuilder.java | 45 ----
.../banyandb/schema/ProfileTaskRecordBuilder.java | 53 ----
.../schema/ProfileThreadSnapshotRecordBuilder.java | 45 ----
.../banyandb/schema/SegmentRecordBuilder.java | 68 -----
...ceInstanceRelationClientSideMetricsBuilder.java | 47 ----
...ceInstanceRelationServerSideMetricsBuilder.java | 47 ----
.../ServiceRelationClientSideMetricsBuilder.java | 44 ----
.../ServiceRelationServerSideMetricsBuilder.java | 44 ----
.../plugin/banyandb/schema/UITemplateBuilder.java | 47 ----
.../banyandb/stream/AbstractBanyanDBDAO.java | 25 +-
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 68 ++---
.../stream/BanyanDBBrowserLogQueryDAO.java | 77 +++---
.../banyandb/stream/BanyanDBEventQueryDAO.java | 148 -----------
.../banyandb/stream/BanyanDBLogQueryDAO.java | 83 ++++---
.../banyandb/stream/BanyanDBManagementDAO.java | 70 ------
.../banyandb/stream/BanyanDBMetadataQueryDAO.java | 214 ----------------
.../plugin/banyandb/stream/BanyanDBMetricsDAO.java | 57 -----
.../stream/BanyanDBNetworkAddressAliasDAO.java | 76 ------
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 49 ++--
.../stream/BanyanDBProfileTaskQueryDAO.java | 120 ---------
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 146 ++++++-----
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 27 +-
.../plugin/banyandb/stream/BanyanDBStorageDAO.java | 69 +-----
.../stream/BanyanDBStreamInsertRequest.java | 2 +-
.../banyandb/stream/BanyanDBTopologyQueryDAO.java | 276 ---------------------
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 105 ++++----
.../stream/BanyanDBUITemplateManagementDAO.java | 119 ---------
52 files changed, 1073 insertions(+), 2700 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
new file mode 100644
index 0000000000..565245ad55
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.SerializableTag;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.List;
+import java.util.function.Function;
+
+public class BanyanDBConverter {
+ @RequiredArgsConstructor
+ public static class StreamToEntity implements Convert2Entity {
+ private final StreamMetadata metadata;
+ private final RowEntity rowEntity;
+
+ @Override
+ public Object get(String fieldName) {
+ final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+ if (metadata == null) {
+ return null;
+ }
+ return rowEntity.getValue(metadata.getTagFamilyName(), metadata.getTagSpec().getTagName());
+ }
+
+ @Override
+ public <T, R> R getWith(String fieldName, Function<T, R> typeDecoder) {
+ return (R) this.get(fieldName);
+ }
+ }
+
+ @RequiredArgsConstructor
+ public static class StreamToStorage implements Convert2Storage<StreamWrite> {
+ private final StreamMetadata metadata;
+ private final StreamWrite streamWrite;
+
+ @Override
+ public void accept(String fieldName, Object fieldValue) {
+ // skip "time_bucket"
+ if (Record.TIME_BUCKET.equals(fieldName)) {
+ return;
+ }
+ final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+ if (metadata == null) {
+ return;
+ }
+ switch (metadata.getTagFamilyName()) {
+ case StreamMetadata.TAG_FAMILY_DATA:
+ this.streamWrite.dataTag(metadata.getTagIndex(), buildTag(fieldValue));
+ break;
+ case StreamMetadata.TAG_FAMILY_SEARCHABLE:
+ this.streamWrite.searchableTag(metadata.getTagIndex(), buildTag(fieldValue));
+ break;
+ default:
+ throw new IllegalStateException("tag family is not supported");
+ }
+ }
+
+ private SerializableTag<BanyandbModel.TagValue> buildTag(Object value) {
+ if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) {
+ return TagAndValue.longField((long) value);
+ } else if (String.class.equals(value.getClass())) {
+ return TagAndValue.stringField((String) value);
+ }
+ throw new IllegalStateException(value.getClass() + " is not supported");
+ }
+
+ @Override
+ public void accept(String fieldName, byte[] fieldValue) {
+ final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+ if (metadata == null) {
+ return;
+ }
+ if (StreamMetadata.TAG_FAMILY_SEARCHABLE.equals(metadata.getTagFamilyName())) {
+ this.streamWrite.searchableTag(metadata.getTagIndex(), TagAndValue.binaryField((fieldValue)));
+ } else {
+ throw new IllegalStateException("binary tag should not be store in the `data` family");
+ }
+ }
+
+ @Override
+ public void accept(String fieldName, List<String> fieldValue) {
+ for (final String tagKeyAndValue : fieldValue) {
+ if (StringUtil.isEmpty(tagKeyAndValue)) {
+ continue;
+ }
+ int pos = tagKeyAndValue.indexOf("=");
+ if (pos == -1) {
+ continue;
+ }
+ String key = tagKeyAndValue.substring(0, pos);
+ String value = tagKeyAndValue.substring(pos + 1);
+ this.accept(key, value);
+ }
+ }
+
+ @Override
+ public Object get(String fieldName) {
+ final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName);
+ if (metadata == null) {
+ return null;
+ }
+ // TODO: get an unmodifiable view of tag
+ return null;
+ }
+
+ @Override
+ public StreamWrite obtain() {
+ if (metadata.isUseIdAsEntity()) {
+ this.accept(StreamMetadata.ID, this.streamWrite.getElementID());
+ }
+ return this.streamWrite;
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index c9566b1549..f4483b0a14 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -19,6 +19,11 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
+import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
+import org.apache.skywalking.banyandb.v1.client.metadata.Group;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
@@ -27,21 +32,36 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
+ private final ConfigService configService;
+
public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager) {
super(client, moduleManager);
+ this.configService = moduleManager.find(CoreModule.NAME)
+ .provider()
+ .getService(ConfigService.class);
}
@Override
protected boolean isExists(Model model) throws StorageException {
+ // TODO: get from BanyanDB and make a diff?
return false;
}
@Override
protected void createTable(Model model) throws StorageException {
- StreamMetaInfo metaInfo = StreamMetaInfo.addModel(model);
- if (metaInfo != null) {
- log.info("install index {}", model.getName());
- ((BanyanDBStorageClient) client).createStream(metaInfo);
+ if (model.isTimeSeries() && model.isRecord()) { // stream
+ StreamMetadata metaInfo = MetadataRegistry.INSTANCE.registerModel(model, this.configService);
+ if (metaInfo != null) {
+ log.info("install index {}", model.getName());
+ ((BanyanDBStorageClient) client).define(
+ new Group(metaInfo.getGroup(), Catalog.STREAM, 2, 10, Duration.ofDays(7))
+ );
+ ((BanyanDBStorageClient) client).define(metaInfo);
+ }
+ } else if (model.isTimeSeries() && !model.isRecord()) { // measure
+ log.info("skip measure index {}", model.getName());
+ } else if (!model.isTimeSeries()) { // UITemplate
+ log.info("skip property index {}", model.getName());
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java
new file mode 100644
index 0000000000..080bf4577a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBManagementDAO.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+import java.io.IOException;
+
+@RequiredArgsConstructor
+public class BanyanDBManagementDAO implements IManagementDAO {
+ private final BanyanDBStorageClient client;
+ private final StorageBuilder<ManagementData> storageBuilder;
+
+ @Override
+ public void insert(Model model, ManagementData storageData) throws IOException {
+
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
new file mode 100644
index 0000000000..af54dd350f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
@@ -0,0 +1,36 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+@RequiredArgsConstructor
+public class BanyanDBMetricsDAO implements IMetricsDAO {
+ private final StorageBuilder<Metrics> storageBuilder;
+
+ @Override
+ public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
+ return new InsertRequest() {
+ };
+ }
+
+ @Override
+ public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
+ return new UpdateRequest() {
+ };
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
similarity index 51%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index ae5c766897..bd4a5b0926 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -16,36 +16,26 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import java.io.IOException;
-/**
- * DAO for NoneStream, specifically ProfileTaskRecord
- *
- * @param <T> For NoneStream, we only have {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord}
- */
-@RequiredArgsConstructor
-public class BanyanDBNoneStreamDAO<T extends NoneStream> implements INoneStreamDAO {
- private final BanyanDBStorageClient client;
- private final BanyanDBStorageDataBuilder<T> storageBuilder;
+public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> implements INoneStreamDAO {
+ private final StorageBuilder<NoneStream> storageBuilder;
+
+ public BanyanDBNoneStreamDAO(BanyanDBStorageClient client, StorageBuilder<NoneStream> storageBuilder) {
+ super(client);
+ this.storageBuilder = storageBuilder;
+ }
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
- final long timestamp = TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling());
- StreamWrite.StreamWriteBuilder builder =
- this.storageBuilder.entity2Storage((T) noneStream)
- .name(model.getName())
- .timestamp(timestamp);
- this.client.write(builder.build());
+
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
deleted file mode 100644
index 0cb10763eb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
-import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EndpointRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceInstanceRelationClientSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceInstanceRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceRelationClientSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ServiceRelationServerSideMetricsBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
-
-@Slf4j
-public class BanyanDBStorageBuilderFactory implements StorageBuilderFactory {
- private static final StorageBuilderFactory FALLBACK = new StorageBuilderFactory.Default();
-
- @Override
- public BuilderTemplateDefinition builderTemplate() {
- return new BuilderTemplateDefinition(StorageHashMapBuilder.class.getName(), "metrics-builder");
- }
-
- @Override
- public Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType, Class<? extends StorageBuilder> defaultBuilder) {
- if (SegmentRecord.class.equals(dataType)) {
- return SegmentRecordBuilder.class;
- } else if (AlarmRecord.class.equals(dataType)) {
- return AlarmRecordBuilder.class;
- } else if (BrowserErrorLogRecord.class.equals(dataType)) {
- return BrowserErrorLogRecordBuilder.class;
- } else if (LogRecord.class.equals(dataType)) {
- return LogRecordBuilder.class;
- } else if (ProfileTaskLogRecord.class.equals(dataType)) {
- return ProfileTaskLogRecordBuilder.class;
- } else if (ProfileThreadSnapshotRecord.class.equals(dataType)) {
- return ProfileThreadSnapshotRecordBuilder.class;
- } else if (ProfileTaskRecord.class.equals(dataType)) {
- return ProfileTaskRecordBuilder.class;
- } else if (UITemplate.class.equals(dataType)) {
- return UITemplateBuilder.class;
- } else if (Event.class.equals(dataType)) {
- return EventBuilder.class;
- } else if (ServiceTraffic.class.equals(dataType)) {
- return Metadata.ServiceTrafficBuilder.class;
- } else if (InstanceTraffic.class.equals(dataType)) {
- return Metadata.InstanceTrafficBuilder.class;
- } else if (EndpointTraffic.class.equals(dataType)) {
- return Metadata.EndpointTrafficBuilder.class;
- } else if (NetworkAddressAlias.class.equals(dataType)) {
- return NetworkAddressAliasBuilder.class;
- } else if (EndpointRelationServerSideMetrics.class.equals(dataType)) {
- return EndpointRelationServerSideMetricsBuilder.class;
- } else if (ServiceRelationServerSideMetrics.class.equals(dataType)) {
- return ServiceRelationServerSideMetricsBuilder.class;
- } else if (ServiceRelationClientSideMetrics.class.equals(dataType)) {
- return ServiceRelationClientSideMetricsBuilder.class;
- } else if (ServiceInstanceRelationServerSideMetrics.class.equals(dataType)) {
- return ServiceInstanceRelationServerSideMetricsBuilder.class;
- } else if (ServiceInstanceRelationClientSideMetrics.class.equals(dataType)) {
- return ServiceInstanceRelationClientSideMetricsBuilder.class;
- }
-
- return FALLBACK.builderOf(dataType, defaultBuilder);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 6b4780cad7..b1ca206110 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
-import org.apache.skywalking.banyandb.v1.client.GroupedBanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
@@ -33,6 +32,8 @@ import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckab
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
@@ -40,15 +41,16 @@ import java.io.IOException;
*/
public class BanyanDBStorageClient implements Client, HealthCheckable {
private final BanyanDBClient client;
- private GroupedBanyanDBClient streamClient;
+ private final Map<String, Group> groupMap;
private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
public BanyanDBStorageClient(String host, int port) {
this.client = new BanyanDBClient(host, port);
+ this.groupMap = new ConcurrentHashMap<>();
}
- public void defineStreamGroup(Group group) {
- this.streamClient = this.client.attachGroup(group);
+ public Group define(Group group) {
+ return groupMap.computeIfAbsent(group.getName(), s -> client.define(group));
}
@Override
@@ -63,7 +65,7 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
public StreamQueryResponse query(StreamQuery streamQuery) {
try {
- StreamQueryResponse response = this.streamClient.queryStreams(streamQuery);
+ StreamQueryResponse response = this.client.queryStreams(streamQuery);
this.healthChecker.health();
return response;
} catch (Throwable t) {
@@ -72,19 +74,19 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
}
}
- public void createStream(StreamMetaInfo streamMetaInfo) {
- Stream stm = this.streamClient.define(streamMetaInfo.getStream());
- if (stm != null) {
- this.streamClient.defineIndexRules(stm, streamMetaInfo.getIndexRules().toArray(new IndexRule[]{}));
+ public void define(StreamMetadata streamMetadata) {
+ Stream stream = this.client.define(streamMetadata.getStream());
+ if (stream != null) {
+ this.client.defineIndexRules(stream, streamMetadata.getIndexRules().toArray(new IndexRule[]{}));
}
}
public void write(StreamWrite streamWrite) {
- this.streamClient.write(streamWrite);
+ this.client.write(streamWrite);
}
public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
- return this.streamClient.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
+ return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index b938ab1159..922bf2296a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
-import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
@@ -48,21 +46,21 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBUITemplateManagementDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBHistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBMetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBNetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBTopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBTraceQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBUITemplateManagementDAO;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
@@ -93,26 +91,26 @@ public class BanyanDBStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- this.registerServiceImplementation(StorageBuilderFactory.class, new BanyanDBStorageBuilderFactory());
+ this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
this.client = new BanyanDBStorageClient(config.getHost(), config.getPort());
// Stream
this.registerServiceImplementation(IBatchDAO.class, new BanyanDBBatchDAO(client, config.getMaxBulkSize(), config.getFlushInterval(), config.getConcurrentWriteThreads()));
this.registerServiceImplementation(StorageDAO.class, new BanyanDBStorageDAO(client));
- this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO(client));
+ this.registerServiceImplementation(INetworkAddressAliasDAO.class, new BanyanDBNetworkAddressAliasDAO());
this.registerServiceImplementation(ITraceQueryDAO.class, new BanyanDBTraceQueryDAO(client));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new BanyanDBBrowserLogQueryDAO(client));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO());
this.registerServiceImplementation(IAlarmQueryDAO.class, new BanyanDBAlarmQueryDAO(client));
this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO(client));
+ this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO());
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client));
- this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client));
- this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO(client));
- this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO(client));
+ this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO());
+ this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO());
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO());
// TODO: metrics
this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO());
@@ -135,9 +133,6 @@ public class BanyanDBStorageProvider extends ModuleProvider {
this.client.registerChecker(healthChecker);
try {
this.client.connect();
- // create stream group
- final Group streamGroup = new Group("default-stream", Catalog.STREAM, 2);
- this.client.defineStreamGroup(streamGroup);
BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, getManager());
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (Exception e) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
new file mode 100644
index 0000000000..83af6da867
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public enum MetadataRegistry {
+ INSTANCE;
+
+ private final Map<String, StreamMetadata> streams = new HashMap<>();
+
+ public StreamMetadata registerModel(Model model, ConfigService configService) {
+ BanyandbDatabase.Stream pbStream = parseStreamFromModel(model, configService);
+
+ final boolean useIdAsEntity = pbStream.getEntity().getTagNamesCount() == 1 &&
+ StreamMetadata.ID.equals(pbStream.getEntity().getTagNames(0));
+
+ final Stream stream = new Stream(pbStream.getMetadata().getGroup(), pbStream.getMetadata().getName());
+
+ List<IndexRule> indexRules = new ArrayList<>();
+ Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
+ stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
+
+ Map<String, StreamMetadata.TagMetadata> tagDefinition = new HashMap<>();
+
+ for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
+ final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
+ stream.addTagFamilySpec(tagFamilySpec);
+
+ int tagIndex = 0;
+ for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
+ // register tag
+ tagDefinition.put(tagSpec.getTagName(), new StreamMetadata.TagMetadata(tagFamilySpec.getTagFamilyName(), tagSpec, tagIndex++));
+
+ // if the tag family equals to "searchable", build index rules
+ if (tagFamilySpec.getTagFamilyName().equals(StreamMetadata.TAG_FAMILY_SEARCHABLE)) {
+ // check if this spec exists in the entity names
+ if (entityNameSet.contains(tagSpec.getTagName())) {
+ continue;
+ }
+ BanyandbDatabase.IndexRule pbIndexRule = parseIndexRuleFromTagSpec(pbStream.getMetadata(), tagSpec);
+ IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
+ indexRules.add(indexRule);
+ }
+ }
+ }
+
+ StreamMetadata streamMetadata = StreamMetadata.builder().model(model).stream(stream)
+ .tagDefinition(tagDefinition)
+ .indexRules(indexRules)
+ .group(pbStream.getMetadata().getGroup())
+ .useIdAsEntity(useIdAsEntity)
+ .build();
+ streams.put(model.getName(), streamMetadata);
+ return streamMetadata;
+ }
+
+ public StreamMetadata findStreamMetadata(final String name) {
+ return this.streams.get(name);
+ }
+
+ private BanyandbDatabase.Stream parseStreamFromModel(Model model, ConfigService configService) {
+ List<ModelColumn> shardingColumns = new ArrayList<>();
+
+ List<BanyandbDatabase.TagSpec> searchableTagsSpecs = new ArrayList<>();
+ List<BanyandbDatabase.TagSpec> dataTagsSpecs = new ArrayList<>();
+ for (final ModelColumn modelColumn : model.getColumns()) {
+ if (modelColumn.getShardingKeyIdx() > -1) {
+ shardingColumns.add(modelColumn);
+ }
+ if (modelColumn.isIndexOnly()) {
+ // skip
+ } else if (modelColumn.isStorageOnly()) {
+ dataTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+ } else {
+ searchableTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn));
+ }
+ }
+
+ Set<String> entities = shardingColumns.stream()
+ .sorted(Comparator.comparingInt(ModelColumn::getShardingKeyIdx))
+ .map(modelColumn -> modelColumn.getColumnName().getStorageName())
+ .collect(Collectors.toSet());
+
+ if (entities.isEmpty()) {
+ // if sharding keys are not defined, we have to use ID
+ entities = Collections.singleton(StreamMetadata.ID);
+ // append ID
+ searchableTagsSpecs.add(BanyandbDatabase.TagSpec.newBuilder()
+ .setName(StreamMetadata.ID)
+ .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build());
+ }
+
+ // add all user-defined indexed tags to the end of the "searchable" family
+ if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
+ searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableTracesTags()));
+ } else if (LogRecord.INDEX_NAME.equals(model.getName())) {
+ searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableLogsTags()));
+ } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) {
+ searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableAlarmTags()));
+ }
+
+ String group = "default-stream";
+ if (model.isSuperDataset()) {
+ // for superDataset, we should use separate group
+ group = model.getName() + "-stream";
+ }
+
+ return BanyandbDatabase.Stream.newBuilder()
+ .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
+ .setName(StreamMetadata.TAG_FAMILY_DATA)
+ .addAllTags(dataTagsSpecs)
+ .build())
+ .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder()
+ .setName(StreamMetadata.TAG_FAMILY_SEARCHABLE)
+ .addAllTags(searchableTagsSpecs)
+ .build())
+ .setEntity(BanyandbDatabase.Entity.newBuilder()
+ .addAllTagNames(entities)
+ .build())
+ .setMetadata(BanyandbCommon.Metadata.newBuilder()
+ .setGroup(group)
+ .setName(model.getName())
+ .build())
+ .build();
+ }
+
+ private BanyandbDatabase.TagSpec parseTagSpecFromModelColumn(ModelColumn modelColumn) {
+ final Class<?> clazz = modelColumn.getType();
+ if (String.class.equals(clazz)) {
+ return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+ .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build();
+ } else if (int.class.equals(clazz) || long.class.equals(clazz)) {
+ return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+ .setType(BanyandbDatabase.TagType.TAG_TYPE_INT).build();
+ } else if (byte[].class.equals(clazz) || DataTable.class.equals(clazz)) {
+ return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName())
+ .setType(BanyandbDatabase.TagType.TAG_TYPE_DATA_BINARY).build();
+ } else {
+ throw new IllegalStateException("type " + modelColumn.getType().toString() + " is not supported");
+ }
+ }
+
+ private List<BanyandbDatabase.TagSpec> parseTagSpecsFromConfiguration(String tags) {
+ if (StringUtil.isEmpty(tags)) {
+ return Collections.emptyList();
+ }
+ String[] tagsArray = tags.split(",");
+ if (tagsArray.length == 0) {
+ return Collections.emptyList();
+ }
+ return Arrays.stream(tagsArray)
+ .map(tagName -> BanyandbDatabase.TagSpec.newBuilder().setName(tagName)
+ .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build())
+ .collect(Collectors.toList());
+ }
+
+ private BanyandbDatabase.IndexRule parseIndexRuleFromTagSpec(BanyandbCommon.Metadata metadata, TagFamilySpec.TagSpec tagSpec) {
+ // In SkyWalking, only "trace_id" should be stored as a global index
+ BanyandbDatabase.IndexRule.Location loc = "trace_id".equals(tagSpec.getTagName()) ?
+ BanyandbDatabase.IndexRule.Location.LOCATION_GLOBAL :
+ BanyandbDatabase.IndexRule.Location.LOCATION_SERIES;
+
+ return BanyandbDatabase.IndexRule.newBuilder()
+ .setMetadata(BanyandbCommon.Metadata.newBuilder()
+ .setName(tagSpec.getTagName()).setGroup(metadata.getGroup()))
+ .setLocation(loc)
+ .addTags(tagSpec.getTagName())
+ // TODO: support TYPE_TREE
+ .setType(BanyandbDatabase.IndexRule.Type.TYPE_INVERTED)
+ .build();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
deleted file mode 100644
index 0a2af94339..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.io.CharStreams;
-import com.google.protobuf.util.JsonFormat;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
-import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
-import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
-import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@Getter
-@Builder
-@Slf4j
-public class StreamMetaInfo {
- public static final String TAG_FAMILY_SEARCHABLE = "searchable";
- public static final String TAG_FAMILY_DATA = "data";
-
- public static final String ID = "id";
-
- private static final Map<String, StreamMetaInfo> STREAMS = new HashMap<>();
-
- private final Model model;
-
- /**
- * stream is the metadata to be used for schema creation,
- * 1. Read json from resources/metadata/{model.name}.json and deserialize to protobuf,
- * 2. Iterate over tag families,
- * 3. Iterate over tags in each tag family
- * 4.
- */
- private final Stream stream;
-
- private final List<IndexRule> indexRules;
-
- public static StreamMetaInfo addModel(Model model) {
- BanyandbDatabase.Stream pbStream = parseStreamFromJSON(model.getName());
- if (pbStream == null) {
- log.warn("fail to find stream schema {}", model.getName());
- return null;
- }
- final Stream stream = new Stream(pbStream.getMetadata().getName());
-
- List<IndexRule> indexRules = new ArrayList<>();
-
- Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList());
-
- stream.setEntityTagNames(pbStream.getEntity().getTagNamesList());
-
- for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) {
- final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec);
- stream.addTagFamilySpec(tagFamilySpec);
-
- // if the tag family equals to "searchable", build index rules
- if (tagFamilySpec.getTagFamilyName().equals(TAG_FAMILY_SEARCHABLE)) {
- for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) {
- // check if this spec exists in the entity names
- if (entityNameSet.contains(tagSpec.getTagName())) {
- continue;
- }
- BanyandbDatabase.IndexRule pbIndexRule = parseIndexRulesFromJSON(model.getName(), tagSpec.getTagName());
- if (pbIndexRule == null) {
- log.warn("fail to find the index rule for {}", tagSpec.getTagName());
- continue;
- }
- IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule);
- indexRules.add(indexRule);
- }
- }
- }
-
- return StreamMetaInfo.builder().model(model).stream(stream).indexRules(indexRules).build();
- }
-
- private static BanyandbDatabase.Stream parseStreamFromJSON(String name) {
- try {
- InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream("metadata/" + name + ".json");
- if (is == null) {
- log.warn("fail to find definition for {}", name);
- return null;
- }
- String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
- BanyandbDatabase.Stream.Builder b = BanyandbDatabase.Stream.newBuilder();
- JsonFormat.parser().merge(result, b);
- return b.build();
- } catch (IOException ioEx) {
- log.error("fail to read json", ioEx);
- return null;
- }
- }
-
- private static BanyandbDatabase.IndexRule parseIndexRulesFromJSON(String streamName, String name) {
- try {
- InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream(String.join("/",
- new String[]{"metadata", "index_rules", name + ".json"}));
- if (is == null) {
- log.warn("fail to find index rules for {}", streamName);
- return null;
- }
- String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8));
- BanyandbDatabase.IndexRule.Builder b = BanyandbDatabase.IndexRule.newBuilder();
- JsonFormat.parser().merge(result, b);
- return b.build();
- } catch (IOException ioEx) {
- log.error("fail to read json", ioEx);
- return null;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
new file mode 100644
index 0000000000..814f5f62e2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Builder
+@Slf4j
+public class StreamMetadata {
+ public static final String TAG_FAMILY_SEARCHABLE = "searchable";
+ public static final String TAG_FAMILY_DATA = "data";
+
+ public static final String ID = "id";
+
+ private final Model model;
+
+ private final Map<String, TagMetadata> tagDefinition;
+
+ /**
+ * Group of the stream
+ */
+ private final String group;
+ /**
+ * Spec of the stream
+ */
+ private final Stream stream;
+ /**
+ * Index rules attached to the stream
+ */
+ private final List<IndexRule> indexRules;
+
+ private final int dataFamilySize;
+ private final int searchableFamilySize;
+
+ private final boolean useIdAsEntity;
+
+ @Getter
+ @Data
+ public static class TagMetadata {
+ private final String tagFamilyName;
+ private final TagFamilySpec.TagSpec tagSpec;
+ private final int tagIndex;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
similarity index 59%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
index f8287973bc..f06de0cff5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java
@@ -16,15 +16,22 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
- @Getter
- private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.util.List;
+
+public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+ @Override
+ public Events queryEvents(EventQueryCondition condition) throws Exception {
+ return new Events();
+ }
+
+ @Override
+ public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
+ return new Events();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
new file mode 100644
index 0000000000..5833c98c56
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Process;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+ @Override
+ public List<Service> listServices(String layer, String group) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Service> getServices(String serviceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ServiceInstance getInstance(String instanceId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Process> listProcesses(String serviceId, String instanceId) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Process getProcess(String processId) throws IOException {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
similarity index 65%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
index f8287973bc..685d7b8083 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBNetworkAddressAliasDAO.java
@@ -16,15 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
- @Getter
- private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+ @Override
+ public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
similarity index 57%
copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
index f8287973bc..0c6e1ebfd1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
@@ -16,15 +16,23 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-@RequiredArgsConstructor
-public class BanyanDBStreamInsertRequest implements InsertRequest {
- @Getter
- private final StreamWrite streamWrite;
-}
\ No newline at end of file
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+ @Override
+ public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
new file mode 100644
index 0000000000..9ce2282f56
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBTopologyQueryDAO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.type.Call;
+import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBTopologyQueryDAO implements ITopologyQueryDAO {
+ @Override
+ public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
+ return Collections.emptyList();
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java
new file mode 100644
index 0000000000..efbd9ce106
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBUITemplateManagementDAO.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+ @Override
+ public DashboardConfiguration getTemplate(String id) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
+ return null;
+ }
+
+ @Override
+ public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
+ return null;
+ }
+
+ @Override
+ public TemplateChangeStatus disableTemplate(String id) throws IOException {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
deleted file mode 100644
index 260fc17664..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class AlarmRecordBuilder extends BanyanDBStorageDataBuilder<AlarmRecord> {
- public static final List<String> INDEXED_TAGS = ImmutableList.of(
- "level"
- );
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(AlarmRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
- searchable.add(TagAndValue.longField(entity.getScope()));
- searchable.add(TagAndValue.longField(entity.getStartTime()));
- searchable.addAll(filterSearchableTags(entity.getTags(), INDEXED_TAGS));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(AlarmRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(6);
- data.add(TagAndValue.stringField(entity.getName()));
- data.add(TagAndValue.stringField(entity.getId0()));
- data.add(TagAndValue.stringField(entity.getId1()));
- data.add(TagAndValue.stringField(entity.getAlarmMessage()));
- data.add(TagAndValue.stringField(entity.getRuleName()));
- data.add(TagAndValue.binaryField(entity.getTagsRawData()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
deleted file mode 100644
index 814f3149f7..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class BanyanDBStorageDataBuilder<T extends StorageData> implements StorageBuilder<T, StreamWrite.StreamWriteBuilder> {
- @Override
- public T storage2Entity(StreamWrite.StreamWriteBuilder storageData) {
- return null;
- }
-
- @Override
- public StreamWrite.StreamWriteBuilder entity2Storage(T entity) {
- StreamWrite.StreamWriteBuilder b = StreamWrite.builder()
- .elementId(this.elementID(entity))
- .searchableTags(this.searchableTags(entity))
- .dataTags(this.dataTags(entity));
- Long ts = this.extractTimestamp(entity);
- if (ts != null) {
- b.timestamp(ts);
- }
- return b;
- }
-
- protected Long extractTimestamp(T entity) {
- return null;
- }
-
- protected List<SerializableTag<BanyandbModel.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
- if (rawTags == null) {
- return Collections.emptyList();
- }
- Map<String, SerializableTag<BanyandbModel.TagValue>> map = new HashMap<>();
- for (final Tag tag : rawTags) {
- map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
- }
- final List<SerializableTag<BanyandbModel.TagValue>> tags = new ArrayList<>();
- for (String indexedTag : indexTags) {
- SerializableTag<BanyandbModel.TagValue> tag = map.get(indexedTag);
- if (tag == null) {
- tags.add(TagAndValue.nullField());
- } else {
- tags.add(tag);
- }
- }
-
- return tags;
- }
-
- protected String elementID(T entity) {
- return entity.id();
- }
-
- abstract protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(T entity);
-
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(T entity) {
- return Collections.emptyList();
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
deleted file mode 100644
index 034565249b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class BrowserErrorLogRecordBuilder extends BanyanDBStorageDataBuilder<BrowserErrorLogRecord> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(BrowserErrorLogRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
- searchable.add(TagAndValue.stringField(entity.getUniqueId()));
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- searchable.add(TagAndValue.stringField(entity.getServiceVersionId()));
- searchable.add(TagAndValue.stringField(entity.getPagePathId()));
- searchable.add(TagAndValue.longField(entity.getErrorCategory()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(BrowserErrorLogRecord entity) {
- return Collections.singletonList(TagAndValue.binaryField(entity.getDataBinary()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java
deleted file mode 100644
index c6cef854e4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EndpointRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class EndpointRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<EndpointRelationServerSideMetrics> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(EndpointRelationServerSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
- // 0 - source_endpoint
- searchable.add(TagAndValue.stringField(entity.getSourceEndpoint()));
- // 1 - dest_endpoint
- searchable.add(TagAndValue.stringField(entity.getDestEndpoint()));
- // 2 - entity_id
- searchable.add(TagAndValue.stringField(entity.getEntityId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(EndpointRelationServerSideMetrics entity) {
- // 0 - component_id
- return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
deleted file mode 100644
index 512fe2aa5c..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.source.Event;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class EventBuilder extends BanyanDBStorageDataBuilder<Event> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(Event entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(8);
- searchable.add(TagAndValue.stringField(entity.getUuid()));
- searchable.add(TagAndValue.stringField(entity.getService()));
- searchable.add(TagAndValue.stringField(entity.getServiceInstance()));
- searchable.add(TagAndValue.stringField(entity.getEndpoint()));
- searchable.add(TagAndValue.stringField(entity.getName()));
- searchable.add(TagAndValue.stringField(entity.getType()));
- searchable.add(TagAndValue.longField(entity.getStartTime()));
- searchable.add(TagAndValue.longField(entity.getEndTime()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(Event entity) {
- return ImmutableList.of(
- TagAndValue.stringField(entity.getMessage()),
- TagAndValue.stringField(entity.getParameters())
- );
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
deleted file mode 100644
index 8fddd927a4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class LogRecordBuilder extends BanyanDBStorageDataBuilder<LogRecord> {
- public static final List<String> INDEXED_TAGS = ImmutableList.of(
- "level"
- );
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(LogRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
- searchable.add(TagAndValue.stringField(entity.getUniqueId()));
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- searchable.add(TagAndValue.stringField(entity.getServiceInstanceId()));
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- searchable.add(TagAndValue.stringField(entity.getEndpointId()));
- searchable.add(TagAndValue.stringField(entity.getTraceId()));
- searchable.add(TagAndValue.stringField(entity.getTraceSegmentId()));
- searchable.add(TagAndValue.longField(entity.getSpanId()));
- searchable.addAll(filterSearchableTags(entity.getTags(), INDEXED_TAGS));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(LogRecord entity) {
- return ImmutableList.of(
- TagAndValue.stringField(entity.getContent()),
- TagAndValue.longField(entity.getContentType()),
- TagAndValue.binaryField(entity.getTagsRawData())
- );
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
deleted file mode 100644
index 9f0fda0ac4..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.skywalking.oap.server.core.Const.DOUBLE_COLONS_SPLIT;
-
-public class Metadata {
- public static class ServiceTrafficBuilder extends BanyanDBStorageDataBuilder<ServiceTraffic> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceTraffic entity) {
- final String serviceName = entity.getName();
- entity.setShortName(serviceName);
- if (entity.isNormal()) {
- int groupIdx = serviceName.indexOf(DOUBLE_COLONS_SPLIT);
- if (groupIdx > 0) {
- entity.setGroup(serviceName.substring(0, groupIdx));
- entity.setShortName(serviceName.substring(groupIdx + 2));
- }
- }
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(4);
- // 0 - serviceName
- searchable.add(TagAndValue.stringField(serviceName));
- // 1 - serviceID
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- // 2 - group
- searchable.add(TagAndValue.stringField(entity.getGroup()));
- // 3 - layer
- Layer layer = entity.getLayer();
- searchable.add(TagAndValue.longField(layer != null ? layer.value() : Layer.UNDEFINED.value()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceTraffic entity) {
- // 0 - shortName
- return Collections.singletonList(TagAndValue.stringField(entity.getShortName()));
- }
- }
-
- public static class EndpointTrafficBuilder extends BanyanDBStorageDataBuilder<EndpointTraffic> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(EndpointTraffic entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
- // 0 - serviceID
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- // 1 - name
- searchable.add(TagAndValue.stringField(entity.getName()));
- return searchable;
- }
- }
-
- public static class InstanceTrafficBuilder extends BanyanDBStorageDataBuilder<InstanceTraffic> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(InstanceTraffic entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
- // serviceID
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- // lastPingTimestamp
- searchable.add(TagAndValue.longField(entity.getLastPingTimestamp()));
- // ID: we have to duplicate "ID" here for query
- searchable.add(TagAndValue.stringField(entity.id()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(InstanceTraffic entity) {
- return Collections.singletonList(TagAndValue.binaryField(
- entity.serialize().build().toByteArray()
- ));
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
deleted file mode 100644
index e429f80b9c..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class NetworkAddressAliasBuilder extends BanyanDBStorageDataBuilder<NetworkAddressAlias> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(NetworkAddressAlias entity) {
- return Collections.singletonList(TagAndValue.longField(entity.getLastUpdateTimeBucket()));
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(NetworkAddressAlias entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(4);
- // 0 - time_bucket
- data.add(TagAndValue.longField(entity.getTimeBucket()));
- // 1 - address
- data.add(TagAndValue.stringField(entity.getAddress()));
- // 2 - represent_service_id
- data.add(TagAndValue.stringField(entity.getRepresentServiceId()));
- // 3 - represent_service_instance_id
- data.add(TagAndValue.stringField(entity.getRepresentServiceInstanceId()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
deleted file mode 100644
index 1f861ab4cd..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ProfileTaskLogRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskLogRecord> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileTaskLogRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(2);
- searchable.add(TagAndValue.longField(entity.getOperationTime()));
- searchable.add(TagAndValue.stringField(entity.getInstanceId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ProfileTaskLogRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(2);
- data.add(TagAndValue.stringField(entity.getTaskId()));
- data.add(TagAndValue.longField(entity.getOperationType()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
deleted file mode 100644
index 23a5bf8fdf..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ProfileTaskRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskRecord> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileTaskRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(9);
- // 0 - id
- searchable.add(TagAndValue.stringField(entity.id()));
- // 1 - service_id
- searchable.add(TagAndValue.stringField(entity.getServiceId()));
- // 2 - endpoint_name
- searchable.add(TagAndValue.stringField(entity.getEndpointName()));
- // 3 - start_time
- searchable.add(TagAndValue.longField(entity.getStartTime()));
- // 4 - duration
- searchable.add(TagAndValue.longField(entity.getDuration()));
- // 5 - min_duration_threshold
- searchable.add(TagAndValue.longField(entity.getMinDurationThreshold()));
- // 6 - dump_period
- searchable.add(TagAndValue.longField(entity.getDumpPeriod()));
- // 7 - create_time
- searchable.add(TagAndValue.longField(entity.getCreateTime()));
- // 8 - max_sampling_count
- searchable.add(TagAndValue.longField(entity.getMaxSamplingCount()));
- return searchable;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
deleted file mode 100644
index 28c0ba5f38..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordBuilder extends BanyanDBStorageDataBuilder<ProfileThreadSnapshotRecord> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ProfileThreadSnapshotRecord entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(4);
- searchable.add(TagAndValue.stringField(entity.getTaskId()));
- searchable.add(TagAndValue.stringField(entity.getSegmentId()));
- searchable.add(TagAndValue.longField(entity.getDumpTime()));
- searchable.add(TagAndValue.longField(entity.getSequence()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ProfileThreadSnapshotRecord entity) {
- return Collections.singletonList(TagAndValue.binaryField(entity.getStackBinary()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
deleted file mode 100644
index 729ad90e15..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class SegmentRecordBuilder extends BanyanDBStorageDataBuilder<SegmentRecord> {
- public static final List<String> INDEXED_TAGS = ImmutableList.of(
- "http.method",
- "status_code",
- "db.type",
- "db.instance",
- "mq.queue",
- "mq.topic",
- "mq.broker"
- );
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(SegmentRecord segmentRecord) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(10);
- // 0 - trace_id
- searchable.add(TagAndValue.stringField(segmentRecord.getTraceId()));
- // 1 - is_error
- searchable.add(TagAndValue.longField(segmentRecord.getIsError()));
- // 2 - service_id
- searchable.add(TagAndValue.stringField(segmentRecord.getServiceId()));
- // 3 - service_instance_id
- searchable.add(TagAndValue.stringField(segmentRecord.getServiceInstanceId()));
- // 4 - endpoint_id
- searchable.add(TagAndValue.stringField(segmentRecord.getEndpointId()));
- // 5 - latency
- searchable.add(TagAndValue.longField(segmentRecord.getLatency()));
- // 6 - start_time
- searchable.add(TagAndValue.longField(segmentRecord.getStartTime()));
- // 7 ~ 13: indexed tags
- searchable.addAll(filterSearchableTags(segmentRecord.getTagsRawData(), INDEXED_TAGS));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(SegmentRecord entity) {
- return Collections.singletonList(TagAndValue.binaryField(entity.getDataBinary()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java
deleted file mode 100644
index dc4de6ddcb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationClientSideMetricsBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ServiceInstanceRelationClientSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceInstanceRelationClientSideMetrics> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceInstanceRelationClientSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
- searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
- searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
- searchable.add(TagAndValue.stringField(entity.getEntityId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceInstanceRelationClientSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>(3);
- data.add(TagAndValue.longField(entity.getComponentId()));
- data.add(TagAndValue.stringField(entity.getSourceServiceInstanceId()));
- data.add(TagAndValue.stringField(entity.getDestServiceInstanceId()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java
deleted file mode 100644
index 7796265f8d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceInstanceRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ServiceInstanceRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceInstanceRelationServerSideMetrics> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceInstanceRelationServerSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
- searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
- searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
- searchable.add(TagAndValue.stringField(entity.getEntityId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceInstanceRelationServerSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>();
- data.add(TagAndValue.longField(entity.getComponentId()));
- data.add(TagAndValue.stringField(entity.getSourceServiceInstanceId()));
- data.add(TagAndValue.stringField(entity.getDestServiceInstanceId()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java
deleted file mode 100644
index cf8ae73fdf..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationClientSideMetricsBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ServiceRelationClientSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceRelationClientSideMetrics> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceRelationClientSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>(3);
- searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
- searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
- searchable.add(TagAndValue.stringField(entity.getEntityId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceRelationClientSideMetrics entity) {
- return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java
deleted file mode 100644
index 92a9a13fb9..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ServiceRelationServerSideMetricsBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class ServiceRelationServerSideMetricsBuilder extends BanyanDBStorageDataBuilder<ServiceRelationServerSideMetrics> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(ServiceRelationServerSideMetrics entity) {
- List<SerializableTag<BanyandbModel.TagValue>> searchable = new ArrayList<>();
- searchable.add(TagAndValue.stringField(entity.getSourceServiceId()));
- searchable.add(TagAndValue.stringField(entity.getDestServiceId()));
- searchable.add(TagAndValue.stringField(entity.getEntityId()));
- return searchable;
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(ServiceRelationServerSideMetrics entity) {
- return Collections.singletonList(TagAndValue.longField(entity.getComponentId()));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
deleted file mode 100644
index 6b53a63174..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class UITemplateBuilder extends BanyanDBStorageDataBuilder<UITemplate> {
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> searchableTags(UITemplate entity) {
- return ImmutableList.of(
- TagAndValue.stringField(entity.getName()),
- TagAndValue.longField(entity.getDisabled())
- );
- }
-
- @Override
- protected List<SerializableTag<BanyandbModel.TagValue>> dataTags(UITemplate entity) {
- List<SerializableTag<BanyandbModel.TagValue>> data = new ArrayList<>();
- data.add(TagAndValue.stringField(entity.getType()));
- data.add(TagAndValue.stringField(entity.getConfiguration()));
- data.add(TagAndValue.longField(entity.getActivated()));
- return data;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 04690a30cb..0f093e396b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -19,17 +19,15 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.time.Instant;
import java.util.List;
-import java.util.function.Function;
public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
private static final Instant UPPER_BOUND = Instant.ofEpochSecond(0, Long.MAX_VALUE);
@@ -40,17 +38,17 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
super(client);
}
- protected StreamQueryResponse query(String indexName, List<String> searchableTags, QueryBuilder builder) {
- return this.query(indexName, searchableTags, null, builder);
+ protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, QueryBuilder builder) {
+ return this.query(metadata, searchableTags, null, builder);
}
- protected StreamQueryResponse query(String indexName, List<String> searchableTags, TimestampRange timestampRange,
+ protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, TimestampRange timestampRange,
QueryBuilder builder) {
final StreamQuery query;
if (timestampRange == null) {
- query = new StreamQuery(indexName, LARGEST_TIME_RANGE, searchableTags);
+ query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), LARGEST_TIME_RANGE, searchableTags);
} else {
- query = new StreamQuery(indexName, timestampRange, searchableTags);
+ query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), timestampRange, searchableTags);
}
builder.apply(query);
@@ -62,22 +60,19 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
abstract void apply(final StreamQuery query);
protected PairQueryCondition<Long> eq(String name, long value) {
- return PairQueryCondition.LongQueryCondition.eq(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
}
protected PairQueryCondition<Long> lte(String name, long value) {
- return PairQueryCondition.LongQueryCondition.le(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.le(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
}
protected PairQueryCondition<Long> gte(String name, long value) {
- return PairQueryCondition.LongQueryCondition.ge(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.LongQueryCondition.ge(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
}
protected PairQueryCondition<String> eq(String name, String value) {
- return PairQueryCondition.StringQueryCondition.eq(StreamMetaInfo.TAG_FAMILY_SEARCHABLE, name, value);
+ return PairQueryCondition.StringQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value);
}
}
-
- interface RowEntityDeserializer<T> extends Function<RowEntity, T> {
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 08225e51e1..0b7693d16c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,14 +18,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
-import com.google.gson.reflect.TypeToken;
-import com.google.protobuf.ByteString;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -33,22 +29,25 @@ import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
* which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
*/
public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
+ private final StreamMetadata alarmRecordMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(AlarmRecord.INDEX_NAME);
+
public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -60,7 +59,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
- StreamQueryResponse resp = query(AlarmRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(alarmRecordMetadata,
ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
tsRange,
new QueryBuilder() {
@@ -76,9 +75,8 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
- if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(eq(tag.getKey(), tag.getValue()));
- }
+ // TODO: check whether tags in the alarm are indexed
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
}
}
query.setLimit(limit);
@@ -86,39 +84,27 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
}
});
- List<AlarmMessage> messages = resp.getElements().stream().map(new AlarmMessageDeserializer())
- .collect(Collectors.toList());
-
Alarms alarms = new Alarms();
- alarms.setTotal(messages.size());
- alarms.getMsgs().addAll(messages);
- return alarms;
- }
+ alarms.setTotal(resp.size());
- public static class AlarmMessageDeserializer implements RowEntityDeserializer<AlarmMessage> {
- @Override
- public AlarmMessage apply(RowEntity row) {
- AlarmMessage alarmMessage = new AlarmMessage();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- int scopeID = ((Number) searchable.get(0).getValue()).intValue();
- alarmMessage.setScopeId(scopeID);
- alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
- alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- alarmMessage.setId((String) data.get(0).getValue());
- alarmMessage.setId1((String) data.get(1).getValue());
- alarmMessage.setMessage((String) data.get(2).getValue());
- Object o = data.get(3).getValue();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
- }
- return alarmMessage;
- }
+ for (final RowEntity rowEntity : resp.getElements()) {
+ AlarmRecord.Builder builder = new AlarmRecord.Builder();
+ AlarmRecord alarmRecord = builder.storage2Entity(
+ new BanyanDBConverter.StreamToEntity(this.alarmRecordMetadata, rowEntity)
+ );
- void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
- List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
- }.getType());
- tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+ AlarmMessage message = new AlarmMessage();
+ message.setId(String.valueOf(alarmRecord.getId0()));
+ message.setId1(String.valueOf(alarmRecord.getId1()));
+ message.setMessage(alarmRecord.getAlarmMessage());
+ message.setStartTime(alarmRecord.getStartTime());
+ message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
+ message.setScopeId(alarmRecord.getScope());
+ if (!CollectionUtils.isEmpty(alarmRecord.getTagsRawData())) {
+ parserDataBinary(alarmRecord.getTagsRawData(), message.getTags());
+ }
+ alarms.getMsgs().add(message);
}
+ return alarms;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index c1bbc16941..d384210cd9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -19,12 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
@@ -35,17 +33,20 @@ import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
*/
public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
+ private final StreamMetadata browserErrorLogRecordMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(BrowserErrorLogRecord.INDEX_NAME);
+
public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -57,8 +58,7 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
}
- final BrowserErrorLogs logs = new BrowserErrorLogs();
- StreamQueryResponse resp = query(BrowserErrorLogRecord.INDEX_NAME, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+ StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
BrowserErrorLogRecord.SERVICE_VERSION_ID,
BrowserErrorLogRecord.PAGE_PATH_ID,
BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
@@ -83,40 +83,47 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
query.setLimit(limit);
}
});
- logs.getLogs().addAll(resp.getElements().stream().map(new BrowserErrorLogDeserializer()).collect(Collectors.toList()));
- logs.setTotal(logs.getLogs().size());
+
+ BrowserErrorLogs logs = new BrowserErrorLogs();
+ logs.setTotal(resp.size());
+
+ for (final RowEntity rowEntity : resp.getElements()) {
+ final byte[] dataBinary =
+ rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, BrowserErrorLogRecord.DATA_BINARY);
+ if (dataBinary != null && dataBinary.length > 0) {
+ BrowserErrorLog log = parserDataBinary(dataBinary);
+ logs.getLogs().add(log);
+ }
+ }
return logs;
}
- public static class BrowserErrorLogDeserializer implements RowEntityDeserializer<BrowserErrorLog> {
- @Override
- public BrowserErrorLog apply(RowEntity row) {
- // FIXME: use protobuf directly
+ /**
+ * TODO: merge the default method in the interface
+ */
+ private BrowserErrorLog parserDataBinary(
+ byte[] dataBinary) {
+ try {
BrowserErrorLog log = new BrowserErrorLog();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- log.setService((String) searchable.get(0).getValue());
- log.setServiceVersion((String) searchable.get(1).getValue());
- log.setPagePath((String) searchable.get(2).getValue());
- log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
- log.setTime(row.getTimestamp());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- Object o = data.get(0).getValue();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- try {
- org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
- .parseFrom((ByteString) o);
- log.setGrade(browserErrorLog.getGrade());
- log.setCol(browserErrorLog.getCol());
- log.setLine(browserErrorLog.getLine());
- log.setMessage(browserErrorLog.getMessage());
- log.setErrorUrl(browserErrorLog.getErrorUrl());
- log.setStack(browserErrorLog.getStack());
- log.setFirstReportedError(browserErrorLog.getFirstReportedError());
- } catch (InvalidProtocolBufferException ex) {
- throw new RuntimeException("fail to parse proto buffer", ex);
- }
- }
+ org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+ .parseFrom(dataBinary);
+
+ log.setService(browserErrorLog.getService());
+ log.setServiceVersion(browserErrorLog.getServiceVersion());
+ log.setTime(browserErrorLog.getTime());
+ log.setPagePath(browserErrorLog.getPagePath());
+ log.setCategory(ErrorCategory.valueOf(browserErrorLog.getCategory().name().toUpperCase()));
+ log.setGrade(browserErrorLog.getGrade());
+ log.setMessage(browserErrorLog.getMessage());
+ log.setLine(browserErrorLog.getLine());
+ log.setCol(browserErrorLog.getCol());
+ log.setStack(browserErrorLog.getStack());
+ log.setErrorUrl(browserErrorLog.getErrorUrl());
+ log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+
return log;
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
deleted file mode 100644
index 44299c38e9..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.query.PaginationUtils;
-import org.apache.skywalking.oap.server.core.query.input.Duration;
-import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
-import org.apache.skywalking.oap.server.core.query.type.event.EventType;
-import org.apache.skywalking.oap.server.core.query.type.event.Events;
-import org.apache.skywalking.oap.server.core.query.type.event.Source;
-import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
- */
-public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO {
- public BanyanDBEventQueryDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public Events queryEvents(EventQueryCondition condition) throws Exception {
- StreamQueryResponse resp = query(Event.INDEX_NAME,
- ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, Event.TYPE, Event.START_TIME, Event.END_TIME),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
-
- buildConditions(condition, query);
-
- PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
- query.setLimit(page.getLimit());
- query.setOffset(page.getFrom());
- switch (condition.getOrder()) {
- case ASC:
- query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.ASC));
- break;
- case DES:
- query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
- }
- }
-
- private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
- if (!Strings.isNullOrEmpty(condition.getUuid())) {
- query.appendCondition(eq(Event.UUID, condition.getUuid()));
- }
- final Source source = condition.getSource();
- if (source != null) {
- if (!Strings.isNullOrEmpty(source.getService())) {
- query.appendCondition(eq(Event.SERVICE, source.getService()));
- }
- if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
- query.appendCondition(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
- }
- if (!Strings.isNullOrEmpty(source.getEndpoint())) {
- query.appendCondition(eq(Event.ENDPOINT, source.getEndpoint()));
- }
- }
- if (!Strings.isNullOrEmpty(condition.getName())) {
- query.appendCondition(eq(Event.NAME, condition.getName()));
- }
- if (condition.getType() != null) {
- query.appendCondition(eq(Event.TYPE, condition.getType().name()));
- }
- final Duration time = condition.getTime();
- if (time != null) {
- if (time.getStartTimestamp() > 0) {
- query.appendCondition(gte(Event.START_TIME, time.getStartTimestamp()));
- }
- if (time.getEndTimestamp() > 0) {
- query.appendCondition(lte(Event.END_TIME, time.getEndTimestamp()));
- }
- }
- }
- });
-
- List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = resp.getElements().stream().map(new EventDeserializer()).collect(Collectors.toList());
-
- Events events = new Events();
- events.setEvents(eventList);
- events.setTotal(eventList.size());
- return events;
- }
-
- @Override
- public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
- Events events = new Events();
- for (final EventQueryCondition condition : conditionList) {
- Events subEvents = this.queryEvents(condition);
- if (subEvents.getEvents().size() == 0) {
- continue;
- }
-
- events.getEvents().addAll(subEvents.getEvents());
- events.setTotal(events.getTotal() + subEvents.getTotal());
- }
-
- return events;
- }
-
- public static class EventDeserializer implements RowEntityDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
- @Override
- public org.apache.skywalking.oap.server.core.query.type.event.Event apply(RowEntity row) {
- final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
- // searchable
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- resultEvent.setUuid((String) searchable.get(0).getValue());
- resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
- resultEvent.setName((String) searchable.get(4).getValue());
- resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
- resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
- // data
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- resultEvent.setMessage((String) data.get(0).getValue());
- resultEvent.setParameters((String) data.get(1).getValue());
- return resultEvent;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 6bce5767e7..5cef08a726 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -19,21 +19,20 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
@@ -41,17 +40,20 @@ import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
*/
public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
+ private final StreamMetadata logRecordMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(LogRecord.INDEX_NAME);
+
public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -90,9 +92,8 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
- if (LogRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(eq(tag.getKey(), tag.getValue()));
- }
+ // TODO: check log indexed tags
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
}
}
}
@@ -103,45 +104,51 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
- StreamQueryResponse resp = query(LogRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(logRecordMetadata,
ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
AbstractLogRecord.SPAN_ID), tsRange, query);
- List<Log> logEntities = resp.getElements().stream().map(new LogDeserializer()).collect(Collectors.toList());
-
Logs logs = new Logs();
- logs.getLogs().addAll(logEntities);
- logs.setTotal(logEntities.size());
+ logs.setTotal(resp.size());
+ for (final RowEntity rowEntity : resp.getElements()) {
+ Log log = new Log();
+ log.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_ID));
+ log.setServiceInstanceId(
+ rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_INSTANCE_ID));
+ log.setEndpointId(
+ rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.ENDPOINT_ID));
+ if (log.getEndpointId() != null) {
+ log.setEndpointName(
+ IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
+ }
+ log.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.TRACE_ID));
+ log.setTimestamp(((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
+ AbstractLogRecord.TIMESTAMP)).longValue());
+ log.setContentType(ContentType.instanceOf(
+ ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA,
+ AbstractLogRecord.CONTENT_TYPE)).intValue()));
+ log.setContent(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.CONTENT));
+ byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, AbstractLogRecord.TAGS_RAW_DATA);
+ if (dataBinary != null && dataBinary.length > 0) {
+ parserDataBinary(dataBinary, log.getTags());
+ }
+ logs.getLogs().add(log);
+ }
return logs;
}
- public static class LogDeserializer implements RowEntityDeserializer<Log> {
- @Override
- public Log apply(RowEntity row) {
- Log log = new Log();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- log.setServiceId((String) searchable.get(0).getValue());
- log.setServiceInstanceId((String) searchable.get(1).getValue());
- log.setEndpointId((String) searchable.get(2).getValue());
- log.setTraceId((String) searchable.get(3).getValue());
- log.setTimestamp(row.getTimestamp());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
- log.setContent("");
- } else {
- try {
- // Don't read the tags as they have been in the data binary already.
- LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
- for (final KeyStringValuePair pair : logTags.getDataList()) {
- log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
- }
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
- return log;
+ /**
+ * Parser the raw tags.
+ * TODO: merge default method
+ */
+ private void parserDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+ try {
+ LogTags logTags = LogTags.parseFrom(dataBinary);
+ logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
deleted file mode 100644
index b719090919..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.Collections;
-
-/**
- * UITemplate insertion DAO
- *
- * @param <T> The only ManagementData we have now is {@link UITemplate}
- */
-public class BanyanDBManagementDAO<T extends ManagementData> extends AbstractBanyanDBDAO implements IManagementDAO {
- private final BanyanDBStorageDataBuilder<T> storageBuilder;
-
- public BanyanDBManagementDAO(BanyanDBStorageClient client, BanyanDBStorageDataBuilder<T> storageBuilder) {
- super(client);
- this.storageBuilder = storageBuilder;
- }
-
- @Override
- public void insert(Model model, ManagementData storageData) throws IOException {
- // ensure only insert once
- StreamQueryResponse resp = query(UITemplate.INDEX_NAME,
- Collections.singletonList(UITemplate.NAME),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(eq(UITemplate.NAME, storageData.id()));
- }
- });
-
- if (resp != null && resp.getElements().size() > 0) {
- return;
- }
-
- StreamWrite.StreamWriteBuilder streamWrite = this.storageBuilder
- .entity2Storage((T) storageData)
- .name(model.getName())
- .timestamp(Instant.now().toEpochMilli());
- getClient().write(streamWrite.build());
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
deleted file mode 100644
index 85ea280b72..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.JsonElement;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.Layer;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.enumeration.Language;
-import org.apache.skywalking.oap.server.core.query.type.Attribute;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic}
- * {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
- * are all streams.
- */
-public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO {
- public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public List<Service> listServices(final String layer, final String group) throws IOException {
- StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
- ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.SERVICE_ID, ServiceTraffic.GROUP, ServiceTraffic.LAYER),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(ServiceTraffic.SHORT_NAME));
- if (StringUtil.isNotEmpty(layer)) {
- query.appendCondition(eq(ServiceTraffic.LAYER, Layer.valueOf(layer).value()));
- }
- if (StringUtil.isNotEmpty(group)) {
- query.appendCondition(eq(ServiceTraffic.GROUP, group));
- }
- }
- });
-
- return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
- }
-
- @Override
- public List<Service> getServices(String serviceId) throws IOException {
- StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
- ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.SERVICE_ID, ServiceTraffic.GROUP, ServiceTraffic.LAYER),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList(ServiceTraffic.SHORT_NAME));
-
- query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId));
- }
- });
-
- return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
- }
-
- @Override
- public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
- StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
- ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList("data_binary"));
-
- final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
- final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
-
- query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
- query.appendCondition(lte(InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
- query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
- }
- });
-
- return resp.getElements().stream().map(new ServiceInstanceDeserializer()).collect(Collectors.toList());
- }
-
- @Override
- public ServiceInstance getInstance(String instanceId) throws IOException {
- StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
- ImmutableList.of(StreamMetaInfo.ID), new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList("data_binary"));
-
- query.appendCondition(eq(StreamMetaInfo.ID, instanceId));
- }
- });
-
- return resp.getElements().stream().map(new ServiceInstanceDeserializer()).findFirst().orElse(null);
- }
-
- @Override
- public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) throws IOException {
- StreamQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
- ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID), new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
- }
- });
-
- return resp.getElements().stream().map(new EndpointDeserializer()).filter(e -> e.getName().contains(keyword))
- .limit(limit).collect(Collectors.toList());
- }
-
- public static class EndpointDeserializer implements RowEntityDeserializer<Endpoint> {
- @Override
- public Endpoint apply(RowEntity row) {
- Endpoint endpoint = new Endpoint();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- endpoint.setId(row.getId());
- endpoint.setName((String) searchable.get(0).getValue()); // 0 - name
- return endpoint;
- }
- }
-
- public static class ServiceDeserializer implements RowEntityDeserializer<Service> {
- @Override
- public Service apply(RowEntity row) {
- Service service = new Service();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- service.setName((String) searchable.get(0).getValue());
- service.setId((String) searchable.get(1).getValue());
- service.setGroup((String) searchable.get(2).getValue());
- String layerName = Layer.valueOf(((Number) searchable.get(3).getValue()).intValue()).name();
- service.getLayers().add(layerName);
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- service.setShortName((String) data.get(0).getValue());
- return service;
- }
- }
-
- public static class ServiceInstanceDeserializer implements RowEntityDeserializer<ServiceInstance> {
- @Override
- public ServiceInstance apply(RowEntity row) {
- InstanceTraffic instanceTraffic = new InstanceTraffic();
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- Object o = data.get(0).getValue();
- ServiceInstance serviceInstance = new ServiceInstance();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- try {
- RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
- instanceTraffic.deserialize(remoteData);
- serviceInstance.setName(instanceTraffic.getName());
- serviceInstance.setId(row.getId());
- serviceInstance.setInstanceUUID(serviceInstance.getId());
- serviceInstance.setLayer(instanceTraffic.getLayer().name());
-
- if (instanceTraffic.getProperties() != null) {
- for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
- String key = property.getKey();
- String value = property.getValue().getAsString();
- if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
- serviceInstance.setLanguage(Language.value(value));
- } else {
- serviceInstance.getAttributes().add(new Attribute(key, value));
- }
- }
- } else {
- serviceInstance.setLanguage(Language.UNKNOWN);
- }
- } catch (InvalidProtocolBufferException ex) {
- throw new RuntimeException("fail to parse remote data", ex);
- }
- } else {
- throw new RuntimeException("unable to parse binary data");
- }
-
- return serviceInstance;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
deleted file mode 100644
index f1ed5b8d61..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-@RequiredArgsConstructor
-public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
- private final BanyanDBStorageDataBuilder<T> storageBuilder;
-
- @Override
- public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
- return Collections.emptyList();
- }
-
- @Override
- public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
- StreamWrite.StreamWriteBuilder builder = this.storageBuilder.entity2Storage((T) metrics)
- .name(model.getName())
- .timestamp(TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling()));
- return new BanyanDBStreamInsertRequest(builder.build());
- }
-
- @Override
- public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
- return new UpdateRequest() {
- };
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
deleted file mode 100644
index d5006ea689..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link NetworkAddressAlias} is a stream
- */
-public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO {
- public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
- StreamQueryResponse resp = query(NetworkAddressAlias.INDEX_NAME,
- ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
- query.appendCondition(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
- }
- });
-
- return resp.getElements().stream().map(new NetworkAddressAliasDeserializer()).collect(Collectors.toList());
- }
-
- public static class NetworkAddressAliasDeserializer implements RowEntityDeserializer<NetworkAddressAlias> {
- @Override
- public NetworkAddressAlias apply(RowEntity row) {
- NetworkAddressAlias model = new NetworkAddressAlias();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // searchable - last_update_time_bucket
- model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // data 0 - time_bucket
- model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
- // data 1 - address
- model.setAddress((String) data.get(1).getValue());
- // data 2 - represent_service_id
- model.setRepresentServiceId((String) data.get(2).getValue());
- // data 3 - represent_service_instance_id
- model.setRepresentServiceInstanceId((String) data.get(3).getValue());
- return model;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 1da585bb4b..4411957f69 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -22,22 +22,25 @@ import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
*/
public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
+ private final StreamMetadata profileTaskLogRecord =
+ MetadataRegistry.INSTANCE.findStreamMetadata(ProfileTaskLogRecord.INDEX_NAME);
+
private final int queryMaxSize;
public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) {
@@ -47,7 +50,7 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
- StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(profileTaskLogRecord,
ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID),
new QueryBuilder() {
@Override
@@ -58,26 +61,26 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
}
});
- return resp.getElements().stream().map(new ProfileTaskLogDeserializer())
- .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
- .collect(Collectors.toList());
+ final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
+ for (final RowEntity rowEntity : resp.getElements()) {
+ tasks.add(parseTaskLog(rowEntity));
+ }
+
+ return tasks;
}
- public static class ProfileTaskLogDeserializer implements RowEntityDeserializer<ProfileTaskLog> {
- @Override
- public ProfileTaskLog apply(RowEntity row) {
- ProfileTaskLog profileTaskLog = new ProfileTaskLog();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // searchable - operation_time
- profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
- // searchable - instance_id
- profileTaskLog.setInstanceId((String) searchable.get(1).getValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // data - task_id
- profileTaskLog.setTaskId((String) data.get(0).getValue());
- // data - operation_type
- profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(1).getValue()).intValue()));
- return profileTaskLog;
- }
+ private ProfileTaskLog parseTaskLog(RowEntity data) {
+ return ProfileTaskLog.builder()
+ .id(data.getId())
+ .taskId(data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.TASK_ID))
+ .instanceId(
+ data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.INSTANCE_ID))
+ .operationType(ProfileTaskLogOperationType.parse(
+ ((Number) data.getValue(StreamMetadata.TAG_FAMILY_DATA,
+ ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+ .operationTime(
+ ((Number) data.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE,
+ ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+ .build();
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
deleted file mode 100644
index 4bf0996eb8..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
-import org.apache.skywalking.oap.server.library.util.StringUtil;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetaInfo;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
- */
-public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
- public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
- StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
- ImmutableList.of(StreamMetaInfo.ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
- ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT), new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
- }
-
- if (StringUtil.isNotEmpty(endpointName)) {
- query.appendCondition(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
- }
-
- if (Objects.nonNull(startTimeBucket)) {
- query.appendCondition(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
- }
-
- if (Objects.nonNull(endTimeBucket)) {
- query.appendCondition(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
- }
-
- if (Objects.nonNull(limit)) {
- query.setLimit(limit);
- }
-
-// query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
- }
- });
-
- return resp.getElements().stream().map(new ProfileTaskDeserializer()).collect(Collectors.toList());
- }
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- if (StringUtil.isEmpty(id)) {
- return null;
- }
-
- StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
- ImmutableList.of(StreamMetaInfo.ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
- ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(eq(StreamMetaInfo.ID, id));
- query.setLimit(1);
- }
- });
-
- return resp.getElements().stream().map(new ProfileTaskDeserializer()).findAny().orElse(null);
- }
-
- public static class ProfileTaskDeserializer implements RowEntityDeserializer<ProfileTask> {
- @Override
- public ProfileTask apply(RowEntity row) {
- ProfileTask profileTask = new ProfileTask();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- profileTask.setId((String) searchable.get(0).getValue());
- profileTask.setServiceId((String) searchable.get(1).getValue());
- profileTask.setEndpointName((String) searchable.get(2).getValue());
- profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
- profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
- profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
- profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
- profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
- profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
- return profileTask;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 6df30bb465..4c6ac7ddcd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -19,22 +19,25 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -43,13 +46,22 @@ import java.util.stream.Collectors;
* {@link ProfileThreadSnapshotRecord} is a stream
*/
public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO implements IProfileThreadSnapshotQueryDAO {
+ protected final ProfileThreadSnapshotRecord.Builder builder =
+ new ProfileThreadSnapshotRecord.Builder();
+
+ private final StreamMetadata profileThreadSnapshotMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(ProfileThreadSnapshotRecord.INDEX_NAME);
+
+ private final StreamMetadata segmentRecordMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+
public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
- StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
new QueryBuilder() {
@@ -64,22 +76,39 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
return Collections.emptyList();
}
- List<String> segmentIDs = resp.getElements().stream()
- .map(new ProfileThreadSnapshotRecordDeserializer())
- .map(ProfileThreadSnapshotRecord::getSegmentId)
- .collect(Collectors.toList());
+ final List<String> segmentIds = new LinkedList<>();
+ for (final RowEntity rowEntity : resp.getElements()) {
+ segmentIds.add(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, ProfileThreadSnapshotRecord.SEGMENT_ID));
+ }
// TODO: support `IN` or `OR` logic operation in BanyanDB
List<BasicTrace> basicTraces = new ArrayList<>();
- for (String segmentID : segmentIDs) {
- final StreamQueryResponse segmentRecordResp = query(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"),
+ for (String segmentID : segmentIds) {
+ final StreamQueryResponse segmentRecordResp = query(segmentRecordMetadata,
+ ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
new QueryBuilder() {
@Override
public void apply(StreamQuery traceQuery) {
traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID));
}
});
- basicTraces.addAll(segmentRecordResp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList()));
+
+ for (final RowEntity row : segmentRecordResp.getElements()) {
+ BasicTrace basicTrace = new BasicTrace();
+
+ basicTrace.setSegmentId(row.getId());
+ basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+ basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+ ).getEndpointName());
+ basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ basicTrace.setError(BooleanUtils.valueToBoolean(
+ ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+ ));
+ basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+
+ basicTraces.add(basicTrace);
+ }
}
// TODO: Sort in DB with DESC
@@ -104,7 +133,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
- StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
new QueryBuilder() {
@@ -118,26 +147,52 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
}
});
- return resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
+ List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence);
+ for (final RowEntity rowEntity : resp.getElements()) {
+ ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
+ new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+
+ result.add(record);
+ }
+ return result;
}
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
- StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(segmentRecordMetadata,
ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.setDataProjections(Collections.singletonList("data_binary"));
+ query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY));
query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
}
});
- return resp.getElements().stream().map(new SegmentRecordDeserializer()).findFirst().orElse(null);
+ if (resp.size() == 0) {
+ return null;
+ }
+
+ final RowEntity rowEntity = resp.getElements().iterator().next();
+ final SegmentRecord segmentRecord = new SegmentRecord();
+ segmentRecord.setSegmentId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SEGMENT_ID));
+ segmentRecord.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+ segmentRecord.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SERVICE_ID));
+ segmentRecord.setStartTime(
+ ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)).longValue());
+ segmentRecord.setLatency(
+ ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ segmentRecord.setIsError(
+ ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue());
+ byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, SegmentRecord.DATA_BINARY);
+ if (dataBinary != null && dataBinary.length > 0) {
+ segmentRecord.setDataBinary(dataBinary);
+ }
+ return segmentRecord;
}
private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
- StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(profileThreadSnapshotMetadata,
ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
new QueryBuilder() {
@@ -151,7 +206,13 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
}
});
- List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
+ List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
+ for (final RowEntity rowEntity : resp.getElements()) {
+ ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
+ new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity));
+
+ records.add(record);
+ }
switch (aggType) {
case MIN:
@@ -176,55 +237,4 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
enum AggType {
MIN, MAX
}
-
- public static class ProfileThreadSnapshotRecordDeserializer implements RowEntityDeserializer<ProfileThreadSnapshotRecord> {
- @Override
- public ProfileThreadSnapshotRecord apply(RowEntity row) {
- ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- record.setTaskId((String) searchable.get(0).getValue());
- record.setSegmentId((String) searchable.get(1).getValue());
- record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
- record.setSequence(((Number) searchable.get(3).getValue()).intValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
- return record;
- }
- }
-
- public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
- @Override
- public SegmentRecord apply(RowEntity row) {
- SegmentRecord record = new SegmentRecord();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- record.setSegmentId(row.getId());
- record.setTraceId((String) searchable.get(0).getValue());
- record.setIsError(((Number) searchable.get(1).getValue()).intValue());
- record.setServiceId((String) searchable.get(2).getValue());
- record.setServiceInstanceId((String) searchable.get(3).getValue());
- record.setEndpointId((String) searchable.get(4).getValue());
- record.setLatency(((Number) searchable.get(5).getValue()).intValue());
- record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
- return record;
- }
- }
-
- public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
- @Override
- public BasicTrace apply(RowEntity row) {
- BasicTrace trace = new BasicTrace();
- trace.setSegmentId(row.getId());
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- trace.getTraceIds().add((String) searchable.get(0).getValue());
- trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
- trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- (String) searchable.get(2).getValue()
- ).getEndpointName());
- trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
- trace.setStart(String.valueOf(searchable.get(4).getValue()));
- return trace;
- }
- }
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index d028fa91c3..a1cf73edcd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -24,21 +24,34 @@ import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
@RequiredArgsConstructor
-public class BanyanDBRecordDAO<T extends Record> implements IRecordDAO {
- private final BanyanDBStorageDataBuilder<T> storageBuilder;
+public class BanyanDBRecordDAO implements IRecordDAO {
+ private final StorageBuilder<Record> storageBuilder;
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- StreamWrite.StreamWriteBuilder builder = storageBuilder.entity2Storage((T) record)
- .name(model.getName())
- .timestamp(TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()));
+ StreamMetadata metadata = MetadataRegistry.INSTANCE.findStreamMetadata(model.getName());
+ if (metadata == null) {
+ throw new IOException(model.getName() + " is not registered");
+ }
+ StreamWrite streamWrite = new StreamWrite(metadata.getGroup(), // group name
+ model.getName(), // index-name
+ record.id(), // identity
+ TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()), // timestamp
+ metadata.getDataFamilySize(), // length of the "data" tag family
+ metadata.getSearchableFamilySize()); // length of the "searchable" tag family
+ Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(metadata, streamWrite);
+ storageBuilder.entity2Storage(record, convert2Storage);
- return new BanyanDBStreamInsertRequest(builder.build());
+ return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 3c3b4b4a6c..c3f75c8cef 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -23,24 +23,17 @@ import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
-import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBManagementDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBMetricsDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBNoneStreamDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
@Slf4j
public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> implements StorageDAO {
@@ -50,69 +43,21 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple
@Override
public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
- // SKIP:
- // 1. OAL runtime metrics builder
- // 2. Analysis Function builder
- if (storageBuilder.getClass().getName().startsWith("org.apache.skywalking.oap.server.core.")) {
- log.warn("metrics builder {} is not supported yet", storageBuilder.getClass());
- return new IMetricsDAO() {
- @Override
- public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
- return Collections.emptyList();
- }
-
- @Override
- public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
- return new InsertRequest() {
- };
- }
-
- @Override
- public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
- return new UpdateRequest() {
- };
- }
- };
- }
- return new BanyanDBMetricsDAO<>((BanyanDBStorageDataBuilder<Metrics>) storageBuilder);
+ return new BanyanDBMetricsDAO((StorageBuilder<Metrics>) storageBuilder);
}
@Override
public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
- try {
- final Class<?> returnType = storageBuilder.getClass().getDeclaredMethod("storage2Entity", Map.class).getReturnType();
- // FIXME: this is currently a hack to avoid TopN insertion since we will impl TopN later in BanyanDB side
- if (TopN.class.isAssignableFrom(returnType)) {
- return new IRecordDAO() {
- @Override
- public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- return new InsertRequest() {
- };
- }
- };
- } else if (returnType.getName().equals("org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpanRecord")) {
- // SKIP ZipkinSpanRecord
- return new IRecordDAO() {
- @Override
- public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- return new InsertRequest() {
- };
- }
- };
- }
- } catch (NoSuchMethodException ex) {
- log.error("fail to get declared method");
- }
- return new BanyanDBRecordDAO<>((BanyanDBStorageDataBuilder<Record>) storageBuilder);
+ return new BanyanDBRecordDAO((StorageBuilder<Record>) storageBuilder);
}
@Override
public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
- return new BanyanDBNoneStreamDAO<>(getClient(), (BanyanDBStorageDataBuilder<NoneStream>) storageBuilder);
+ return new BanyanDBNoneStreamDAO(getClient(), (StorageBuilder<NoneStream>) storageBuilder);
}
@Override
public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
- return new BanyanDBManagementDAO<>(getClient(), (BanyanDBStorageDataBuilder<ManagementData>) storageBuilder);
+ return new BanyanDBManagementDAO(getClient(), (StorageBuilder<ManagementData>) storageBuilder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
index f8287973bc..b04e4422d5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStreamInsertRequest.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@RequiredArgsConstructor
+@Getter
public class BanyanDBStreamInsertRequest implements InsertRequest {
- @Getter
private final StreamWrite streamWrite;
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java
deleted file mode 100644
index 660190dd10..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTopologyQueryDAO.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import io.vavr.Tuple2;
-import io.vavr.Tuple4;
-import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
-import org.apache.skywalking.oap.server.core.Const;
-import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
-import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
-import org.apache.skywalking.oap.server.core.query.type.Call;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class BanyanDBTopologyQueryDAO extends AbstractBanyanDBDAO implements ITopologyQueryDAO {
- public BanyanDBTopologyQueryDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
- if (CollectionUtils.isEmpty(serviceIds)) {
- throw new UnexpectedException("Service id is empty");
- }
-
- return loadServiceCalls(
- ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.SERVER
- );
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB, List<String> serviceIds) throws IOException {
- if (CollectionUtils.isEmpty(serviceIds)) {
- throw new UnexpectedException("Service id is empty");
- }
-
- return loadServiceCalls(
- ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationClientSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.CLIENT
- );
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(long startTB, long endTB) throws IOException {
- return loadServiceCalls(
- ServiceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationServerSideMetrics.DEST_SERVICE_ID, Collections.emptyList(), DetectPoint.SERVER
- );
- }
-
- @Override
- public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(long startTB, long endTB) throws IOException {
- return loadServiceCalls(
- ServiceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceRelationClientSideMetrics.DEST_SERVICE_ID, Collections.emptyList(), DetectPoint.CLIENT
- );
- }
-
- @Override
- public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
- return loadServiceInstanceCalls(
- ServiceInstanceRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId,
- DetectPoint.SERVER
- );
- }
-
- @Override
- public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, String serverServiceId, long startTB, long endTB) throws IOException {
- return loadServiceInstanceCalls(
- ServiceInstanceRelationClientSideMetrics.INDEX_NAME, startTB, endTB,
- ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
- ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId,
- DetectPoint.CLIENT
- );
- }
-
- @Override
- public List<Call.CallDetail> loadEndpointRelation(long startTB, long endTB, String destEndpointId) throws IOException {
- return loadEndpointCalls(
- EndpointRelationServerSideMetrics.INDEX_NAME, startTB, endTB,
- EndpointRelationServerSideMetrics.SOURCE_ENDPOINT,
- EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId
- );
- }
-
- private List<Call.CallDetail> loadServiceCalls(String tableName,
- long startTB,
- long endTB,
- String sourceCName,
- String destCName,
- List<String> serviceIds,
- DetectPoint detectPoint) throws IOException {
- TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
- List<Call.CallDetail> calls = new ArrayList<>();
- if (serviceIds.isEmpty()) {
- StreamQueryResponse resp = query(tableName, Collections.emptyList(), timeRange, new QueryBuilder() {
- @Override
- void apply(StreamQuery query) {
- // query component_id
- query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
- }
- });
-
- calls.addAll(resp.getElements().stream().map(new ServiceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
- } else {
- for (String fieldOfInterest : ImmutableList.of(sourceCName, destCName)) {
- for (String serviceID : serviceIds) {
- StreamQueryResponse resp = query(tableName, ImmutableList.of(fieldOfInterest), timeRange, new QueryBuilder() {
- @Override
- void apply(StreamQuery query) {
- // query component_id
- query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
-
- query.appendCondition(eq(fieldOfInterest, serviceID));
- }
- });
-
- calls.addAll(resp.getElements().stream().map(new ServiceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
- }
- }
- }
-
- return calls;
- }
-
- private List<Call.CallDetail> loadServiceInstanceCalls(String tableName,
- long startTB,
- long endTB,
- String sourceCName,
- String descCName,
- String sourceServiceId,
- String destServiceId,
- DetectPoint detectPoint) throws IOException {
- TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-
- Set<Tuple4<String, String, String, String>> productQuerySet = ImmutableSet.of(
- new Tuple4<>(sourceCName, sourceServiceId, descCName, destServiceId),
- new Tuple4<>(sourceCName, destServiceId, descCName, sourceServiceId)
- );
-
- List<Call.CallDetail> calls = new ArrayList<>();
-
- for (Tuple4<String, String, String, String> querySet : productQuerySet) {
- StreamQueryResponse resp = query(tableName, ImmutableList.of(querySet._1(), querySet._3()), timeRange, new QueryBuilder() {
- @Override
- void apply(StreamQuery query) {
- // query component_id
- query.setDataProjections(Collections.singletonList(ServiceRelationServerSideMetrics.COMPONENT_ID));
-
- query.appendCondition(eq(querySet._1(), querySet._2()));
- query.appendCondition(eq(querySet._3(), querySet._4()));
- }
- });
-
- calls.addAll(resp.getElements().stream().map(new InstanceCallDetailDeserializer(detectPoint)).collect(Collectors.toList()));
- }
-
- return calls;
- }
-
- private List<Call.CallDetail> loadEndpointCalls(String tableName,
- long startTB,
- long endTB,
- String sourceCName,
- String destCName,
- String id) throws IOException {
- TimestampRange timeRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
-
- Set<Tuple2<String, String>> allPossibleQuerySet = ImmutableSet.of(
- new Tuple2<>(sourceCName, id),
- new Tuple2<>(destCName, id)
- );
-
- List<Call.CallDetail> calls = new ArrayList<>();
-
- for (Tuple2<String, String> querySet : allPossibleQuerySet) {
- StreamQueryResponse resp = query(tableName, ImmutableList.of(querySet._1()), timeRange, new QueryBuilder() {
- @Override
- void apply(StreamQuery query) {
- query.appendCondition(eq(querySet._1(), querySet._2()));
- }
- });
-
- calls.addAll(resp.getElements().stream().map(new EndpointCallDetailDeserializer()).collect(Collectors.toList()));
- }
-
- return calls;
- }
-
- @RequiredArgsConstructor
- public static class ServiceCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
- private final DetectPoint detectPoint;
-
- @Override
- public Call.CallDetail apply(RowEntity rowEntity) {
- Call.CallDetail call = new Call.CallDetail();
- String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
- String entityId = idsSlice[1];
- int componentId = ((Number) rowEntity.getTagFamilies().get(1) // Tag Family: "data"
- .get(0).getValue()).intValue();
- call.buildFromServiceRelation(entityId, componentId, this.detectPoint);
- return call;
- }
- }
-
- @RequiredArgsConstructor
- public static class InstanceCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
- private final DetectPoint detectPoint;
-
- @Override
- public Call.CallDetail apply(RowEntity rowEntity) {
- Call.CallDetail call = new Call.CallDetail();
- String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
- String entityId = idsSlice[1];
- int componentId = ((Number) rowEntity.getTagFamilies().get(1) // Tag Family: "data"
- .get(0).getValue()).intValue();
- call.buildFromInstanceRelation(entityId, componentId, this.detectPoint);
- return call;
- }
- }
-
- @RequiredArgsConstructor
- public static class EndpointCallDetailDeserializer implements RowEntityDeserializer<Call.CallDetail> {
- @Override
- public Call.CallDetail apply(RowEntity rowEntity) {
- Call.CallDetail call = new Call.CallDetail();
- String[] idsSlice = rowEntity.getId().split(Const.ID_CONNECTOR);
- String entityId = idsSlice[1];
- call.buildFromEndpointRelation(entityId, DetectPoint.SERVER);
- return call;
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 642ed519fc..fc0d458c29 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -20,11 +20,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -38,15 +36,20 @@ import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
+ private final StreamMetadata segmentRecordMetadata =
+ MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME);
+
public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -97,9 +100,8 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
- if (SegmentRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(eq(tag.getKey(), tag.getValue()));
- }
+ // TODO: check if we have this tag indexed?
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
}
}
@@ -114,7 +116,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
}
- StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+ StreamQueryResponse resp = query(segmentRecordMetadata,
ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
SegmentRecord.IS_ERROR, // 1 - is_error
SegmentRecord.SERVICE_ID, // 2 - service_id
@@ -124,24 +126,39 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
SegmentRecord.START_TIME), // 6 - start_time
tsRange, q);
- List<BasicTrace> basicTraces = resp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList());
+ TraceBrief traceBrief = new TraceBrief();
+ traceBrief.setTotal(resp.getElements().size());
+
+ for (final RowEntity row : resp.getElements()) {
+ BasicTrace basicTrace = new BasicTrace();
+
+ basicTrace.setSegmentId(row.getId());
+ basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)));
+ basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID)
+ ).getEndpointName());
+ basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue());
+ basicTrace.setError(BooleanUtils.valueToBoolean(
+ ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()
+ ));
+ basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID));
+
+ traceBrief.getTraces().add(basicTrace);
+ }
- TraceBrief brief = new TraceBrief();
- brief.setTotal(basicTraces.size());
- brief.getTraces().addAll(basicTraces);
- return brief;
+ return traceBrief;
}
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
- ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id
- SegmentRecord.IS_ERROR, // 1 - is_error
- SegmentRecord.SERVICE_ID, // 2 - service_id
- SegmentRecord.SERVICE_INSTANCE_ID, // 3 - service_instance_id
- SegmentRecord.ENDPOINT_ID, // 4 - endpoint_id
- SegmentRecord.LATENCY, // 5 - latency
- SegmentRecord.START_TIME), // 6 - start_time
+ StreamQueryResponse resp = query(segmentRecordMetadata,
+ ImmutableList.of(SegmentRecord.TRACE_ID,
+ SegmentRecord.IS_ERROR,
+ SegmentRecord.SERVICE_ID,
+ SegmentRecord.SERVICE_INSTANCE_ID,
+ SegmentRecord.ENDPOINT_ID,
+ SegmentRecord.LATENCY,
+ SegmentRecord.START_TIME),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
@@ -150,47 +167,19 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
}
});
- return resp.getElements().stream().map(new SegmentRecordDeserializer()).collect(Collectors.toList());
+ List<SegmentRecord> segmentRecords = new ArrayList<>(resp.getElements().size());
+
+ for (final RowEntity rowEntity : resp.getElements()) {
+ SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity(
+ new BanyanDBConverter.StreamToEntity(segmentRecordMetadata, rowEntity));
+ segmentRecords.add(segmentRecord);
+ }
+
+ return segmentRecords;
}
@Override
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
-
- public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
- @Override
- public BasicTrace apply(RowEntity row) {
- BasicTrace trace = new BasicTrace();
- trace.setSegmentId(row.getId());
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- trace.getTraceIds().add((String) searchable.get(0).getValue());
- trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
- trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- (String) searchable.get(4).getValue()
- ).getEndpointName());
- trace.setDuration(((Long) searchable.get(5).getValue()).intValue());
- trace.setStart(String.valueOf(searchable.get(6).getValue()));
- return trace;
- }
- }
-
- public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
- @Override
- public SegmentRecord apply(RowEntity row) {
- SegmentRecord record = new SegmentRecord();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- record.setSegmentId(row.getId());
- record.setTraceId((String) searchable.get(0).getValue());
- record.setIsError(((Number) searchable.get(1).getValue()).intValue());
- record.setServiceId((String) searchable.get(2).getValue());
- record.setServiceInstanceId((String) searchable.get(3).getValue());
- record.setEndpointId((String) searchable.get(4).getValue());
- record.setLatency(((Number) searchable.get(5).getValue()).intValue());
- record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
- return record;
- }
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
deleted file mode 100644
index 7888b0f56e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.StreamWrite;
-import org.apache.skywalking.banyandb.v1.client.Tag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
-import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
-import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-
-import java.io.IOException;
-import java.time.Instant;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
- */
-public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO implements UITemplateManagementDAO {
- public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
- super(client);
- }
-
- @Override
- public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- StreamQueryResponse resp = query(UITemplate.INDEX_NAME, ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
- new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setDataProjections(ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
- query.setLimit(10000);
- if (!includingDisabled) {
- query.appendCondition(eq(UITemplate.DISABLED, BooleanUtils.FALSE));
- }
- }
- });
-
- return resp.getElements().stream().map(new DashboardConfigurationDeserializer()).collect(Collectors.toList());
- }
-
- @Override
- public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
- final UITemplate uiTemplate = setting.toEntity();
-
- StreamWrite request = StreamWrite.builder()
- .name(UITemplate.INDEX_NAME)
- // searchable - name
- .searchableTag(Tag.stringField(uiTemplate.getName()))
- // searchable - disabled
- .searchableTag(Tag.longField(uiTemplate.getDisabled()))
- // data - type
- .dataTag(Tag.stringField(uiTemplate.getType()))
- // data - configuration
- .dataTag(Tag.stringField(uiTemplate.getConfiguration()))
- // data - activated
- .dataTag(Tag.longField(uiTemplate.getActivated()))
- .timestamp(Instant.now().toEpochMilli())
- .elementId(uiTemplate.id())
- .build();
- getClient().write(request);
- return TemplateChangeStatus.builder().status(true).build();
- }
-
- @Override
- public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
- return TemplateChangeStatus.builder().status(false).message("Can't update the template").build();
- }
-
- @Override
- public TemplateChangeStatus disableTemplate(String name) throws IOException {
- return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
- }
-
- public static class DashboardConfigurationDeserializer implements RowEntityDeserializer<DashboardConfiguration> {
- @Override
- public DashboardConfiguration apply(RowEntity row) {
- DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // name
- dashboardConfiguration.setName((String) searchable.get(0).getValue());
- // disabled
- dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // activated
- dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
- // configuration
- dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
- // type
- dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
- return dashboardConfiguration;
- }
- }
-}