You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:30 UTC
[skywalking] 09/22: refactor deserializer
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit f8b7633800bfe0b400f52c64e801498d4f9cb7c0
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Wed Dec 8 16:45:12 2021 +0800
refactor deserializer
---
.../banyandb/BanyanDBStorageBuilderFactory.java | 70 +++++++
.../plugin/banyandb/BanyanDBStorageProvider.java | 2 +-
.../deserializer/AbstractBanyanDBDeserializer.java | 53 -----
.../deserializer/AlarmMessageDeserializer.java | 69 -------
.../deserializer/BanyanDBDeserializerFactory.java | 72 -------
.../deserializer/BasicTraceDeserializer.java | 49 -----
.../deserializer/BrowserErrorLogDeserializer.java | 72 -------
.../DashboardConfigurationDeserializer.java | 55 ------
.../deserializer/DatabaseDeserializer.java | 43 -----
.../deserializer/EndpointDeserializer.java | 43 -----
.../banyandb/deserializer/EventDeserializer.java | 55 ------
.../banyandb/deserializer/LogDeserializer.java | 69 -------
.../NetworkAddressAliasDeserializer.java | 53 -----
.../deserializer/ProfileTaskDeserializer.java | 54 ------
.../deserializer/ProfileTaskLogDeserializer.java | 53 -----
.../ProfileThreadSnapshotRecordDeserializer.java | 50 -----
.../deserializer/RowEntityDeserializer.java | 26 ---
.../deserializer/SegmentRecordDeserializer.java | 53 -----
.../banyandb/deserializer/ServiceDeserializer.java | 44 -----
.../deserializer/ServiceInstanceDeserializer.java | 79 --------
.../plugin/banyandb/schema/AlarmRecordBuilder.java | 2 +-
.../banyandb/schema/BanyanDBMetricsBuilder.java | 30 ---
.../banyandb/schema/BanyanDBRecordBuilder.java | 61 ------
.../schema/BanyanDBStorageDataBuilder.java | 56 +++++-
.../schema/BrowserErrorLogRecordBuilder.java | 2 +-
.../plugin/banyandb/schema/EventBuilder.java | 9 +-
.../plugin/banyandb/schema/LogRecordBuilder.java | 2 +-
.../storage/plugin/banyandb/schema/Metadata.java | 6 +-
.../schema/NetworkAddressAliasBuilder.java | 2 +-
.../schema/ProfileTaskLogRecordBuilder.java | 2 +-
.../banyandb/schema/ProfileTaskRecordBuilder.java | 2 +-
.../schema/ProfileThreadSnapshotRecordBuilder.java | 2 +-
.../banyandb/schema/SegmentRecordBuilder.java | 2 +-
.../plugin/banyandb/schema/UITemplateBuilder.java | 8 -
.../banyandb/stream/AbstractBanyanDBDAO.java | 50 +++--
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 88 +++++++--
.../stream/BanyanDBBrowserLogQueryDAO.java | 76 ++++++--
.../banyandb/stream/BanyanDBEventQueryDAO.java | 94 ++++++---
.../banyandb/stream/BanyanDBLogQueryDAO.java | 77 ++++++--
.../banyandb/stream/BanyanDBManagementDAO.java | 42 +++-
.../banyandb/stream/BanyanDBMetadataQueryDAO.java | 215 ++++++++++++++++-----
.../plugin/banyandb/stream/BanyanDBMetricsDAO.java | 14 +-
.../stream/BanyanDBNetworkAddressAliasDAO.java | 44 ++++-
.../banyandb/stream/BanyanDBNoneStreamDAO.java | 18 +-
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 44 ++++-
.../stream/BanyanDBProfileTaskQueryDAO.java | 108 +++++++----
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 166 ++++++++++++----
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 11 +-
.../plugin/banyandb/stream/BanyanDBStorageDAO.java | 97 +---------
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 96 ++++++---
.../stream/BanyanDBUITemplateManagementDAO.java | 52 +++--
51 files changed, 1036 insertions(+), 1506 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
new file mode 100644
index 0000000000..a07fe1f106
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageBuilderFactory.java
@@ -0,0 +1,70 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
+
+public class BanyanDBStorageBuilderFactory implements StorageBuilderFactory {
+ @Override
+ public BuilderTemplateDefinition builderTemplate() {
+ return new BuilderTemplateDefinition(StorageHashMapBuilder.class.getName(), "metrics-builder");
+ }
+
+ @Override
+ public Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType, Class<? extends StorageBuilder> defaultBuilder) {
+ if (SegmentRecord.class.equals(dataType)) {
+ return SegmentRecordBuilder.class;
+ } else if (AlarmRecord.class.equals(dataType)) {
+ return AlarmRecordBuilder.class;
+ } else if (BrowserErrorLogRecord.class.equals(dataType)) {
+ return BrowserErrorLogRecordBuilder.class;
+ } else if (LogRecord.class.equals(dataType)) {
+ return LogRecordBuilder.class;
+ } else if (ProfileTaskLogRecord.class.equals(dataType)) {
+ return ProfileTaskLogRecordBuilder.class;
+ } else if (ProfileThreadSnapshotRecord.class.equals(dataType)) {
+ return ProfileThreadSnapshotRecordBuilder.class;
+ } else if (ProfileTaskRecord.class.equals(dataType)) {
+ return ProfileTaskRecordBuilder.class;
+ } else if (UITemplate.class.equals(dataType)) {
+ return UITemplateBuilder.class;
+ } else if (Event.class.equals(dataType)) {
+ return EventBuilder.class;
+ } else if (ServiceTraffic.class.equals(dataType)) {
+ return Metadata.ServiceTrafficBuilder.class;
+ } else if (InstanceTraffic.class.equals(dataType)) {
+ return Metadata.InstanceTrafficBuilder.class;
+ } else if (EndpointTraffic.class.equals(dataType)) {
+ return Metadata.EndpointTrafficBuilder.class;
+ } else if (NetworkAddressAlias.class.equals(dataType)) {
+ return NetworkAddressAliasBuilder.class;
+ }
+
+ throw new UnsupportedOperationException("unsupported storage type");
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 1df391d302..aade57b62b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -83,7 +83,7 @@ public class BanyanDBStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
- this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
+ this.registerServiceImplementation(StorageBuilderFactory.class, new BanyanDBStorageBuilderFactory());
this.client = new BanyanDBStorageClient(config.getHost(), config.getPort(), config.getGroup());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
deleted file mode 100644
index 0b2f0644ad..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
-
-import java.util.Collections;
-import java.util.List;
-
-public abstract class AbstractBanyanDBDeserializer<T> implements RowEntityDeserializer<T> {
- private final String indexName;
- private final List<String> searchableProjection;
- private final List<String> dataProjection;
-
- protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection) {
- this(indexName, searchableProjection, Collections.emptyList());
- }
-
- protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection, List<String> dataProjection) {
- this.indexName = indexName;
- this.searchableProjection = searchableProjection;
- this.dataProjection = dataProjection;
- }
-
- public StreamQuery buildStreamQuery() {
- final StreamQuery query = new StreamQuery(this.indexName, this.searchableProjection);
- query.setDataProjections(this.dataProjection);
- return query;
- }
-
- public StreamQuery buildStreamQuery(long startTimestamp, long endTimestamp) {
- final StreamQuery query = new StreamQuery(this.indexName, new TimestampRange(startTimestamp, endTimestamp), this.searchableProjection);
- query.setDataProjections(this.dataProjection);
- return query;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java
deleted file mode 100644
index 2df0ef739b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageDeserializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
-import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-
-import java.util.List;
-
-public class AlarmMessageDeserializer extends AbstractBanyanDBDeserializer<AlarmMessage> {
- private static final Gson GSON = new Gson();
-
- public AlarmMessageDeserializer() {
- super(AlarmRecord.INDEX_NAME,
- ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
- ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
- }
-
- @Override
- public AlarmMessage map(RowEntity row) {
- AlarmMessage alarmMessage = new AlarmMessage();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- int scopeID = ((Number) searchable.get(0).getValue()).intValue();
- alarmMessage.setScopeId(scopeID);
- alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
- alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- alarmMessage.setId((String) data.get(0).getValue());
- alarmMessage.setId1((String) data.get(1).getValue());
- alarmMessage.setMessage((String) data.get(2).getValue());
- Object o = data.get(3).getValue();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
- }
- return alarmMessage;
- }
-
- void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
- List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
- }.getType());
- tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
deleted file mode 100644
index 859812530e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.core.query.type.Database;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-import org.apache.skywalking.oap.server.core.query.type.Log;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.query.type.event.Event;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public enum BanyanDBDeserializerFactory {
- INSTANCE;
-
- private final Map<Class<?>, AbstractBanyanDBDeserializer<?>> registry;
-
- BanyanDBDeserializerFactory() {
- registry = new HashMap<>(10);
- register(AlarmMessage.class, new AlarmMessageDeserializer());
- register(BasicTrace.class, new BasicTraceDeserializer());
- register(BrowserErrorLog.class, new BrowserErrorLogDeserializer());
- register(DashboardConfiguration.class, new DashboardConfigurationDeserializer());
- register(Database.class, new DatabaseDeserializer());
- register(Endpoint.class, new EndpointDeserializer());
- register(Event.class, new EventDeserializer());
- register(Log.class, new LogDeserializer());
- register(NetworkAddressAlias.class, new NetworkAddressAliasDeserializer());
- register(ProfileTaskLog.class, new ProfileTaskLogDeserializer());
- register(ProfileTask.class, new ProfileTaskDeserializer());
- register(ProfileThreadSnapshotRecord.class, new ProfileThreadSnapshotRecordDeserializer());
- register(SegmentRecord.class, new SegmentRecordDeserializer());
- register(ServiceInstance.class, new ServiceInstanceDeserializer());
- register(Service.class, new ServiceDeserializer());
- }
-
- private <T> void register(Class<T> clazz, AbstractBanyanDBDeserializer<T> mapper) {
- this.registry.put(clazz, mapper);
- }
-
- @SuppressWarnings({"unchecked"})
- public <T> AbstractBanyanDBDeserializer<T> findDeserializer(Class<T> clazz) {
- return (AbstractBanyanDBDeserializer<T>) registry.get(clazz);
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java
deleted file mode 100644
index 949ae43e16..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceDeserializer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-
-import java.util.List;
-
-public class BasicTraceDeserializer extends AbstractBanyanDBDeserializer<BasicTrace> {
- public BasicTraceDeserializer() {
- super(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"));
- }
-
- @Override
- public BasicTrace map(RowEntity row) {
- BasicTrace trace = new BasicTrace();
- trace.setSegmentId(row.getId());
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- trace.getTraceIds().add((String) searchable.get(0).getValue());
- trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
- trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- (String) searchable.get(2).getValue()
- ).getEndpointName());
- trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
- trace.setStart(String.valueOf(searchable.get(4).getValue()));
- return trace;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java
deleted file mode 100644
index 92605afa67..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogDeserializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
-import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class BrowserErrorLogDeserializer extends AbstractBanyanDBDeserializer<BrowserErrorLog> {
- public BrowserErrorLogDeserializer() {
- super(BrowserErrorLogRecord.INDEX_NAME,
- ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
- BrowserErrorLogRecord.SERVICE_VERSION_ID,
- BrowserErrorLogRecord.PAGE_PATH_ID,
- BrowserErrorLogRecord.ERROR_CATEGORY),
- Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
- }
-
- @Override
- public BrowserErrorLog map(RowEntity row) {
- // FIXME: use protobuf directly
- BrowserErrorLog log = new BrowserErrorLog();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- log.setService((String) searchable.get(0).getValue());
- log.setServiceVersion((String) searchable.get(1).getValue());
- log.setPagePath((String) searchable.get(2).getValue());
- log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
- log.setTime(row.getTimestamp());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- Object o = data.get(0).getValue();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- try {
- org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
- .parseFrom((ByteString) o);
- log.setGrade(browserErrorLog.getGrade());
- log.setCol(browserErrorLog.getCol());
- log.setLine(browserErrorLog.getLine());
- log.setMessage(browserErrorLog.getMessage());
- log.setErrorUrl(browserErrorLog.getErrorUrl());
- log.setStack(browserErrorLog.getStack());
- log.setFirstReportedError(browserErrorLog.getFirstReportedError());
- } catch (InvalidProtocolBufferException ex) {
- throw new RuntimeException("fail to parse proto buffer", ex);
- }
- }
- return log;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java
deleted file mode 100644
index c46db9df8d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
-import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
-import org.apache.skywalking.oap.server.library.util.BooleanUtils;
-
-import java.util.List;
-
-public class DashboardConfigurationDeserializer extends AbstractBanyanDBDeserializer<DashboardConfiguration> {
- public DashboardConfigurationDeserializer() {
- super(UITemplate.INDEX_NAME,
- ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
- ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
- }
-
- @Override
- public DashboardConfiguration map(RowEntity row) {
- DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // name
- dashboardConfiguration.setName((String) searchable.get(0).getValue());
- // disabled
- dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // activated
- dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
- // configuration
- dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
- // type
- dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
- return dashboardConfiguration;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java
deleted file mode 100644
index db6d875358..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseDeserializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Database;
-
-import java.util.List;
-
-public class DatabaseDeserializer extends AbstractBanyanDBDeserializer<Database> {
- public DatabaseDeserializer() {
- super(ServiceTraffic.INDEX_NAME,
- ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE));
- }
-
- @Override
- public Database map(RowEntity row) {
- Database database = new Database();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- database.setId(row.getId());
- database.setName((String) searchable.get(0).getValue());
- return database;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java
deleted file mode 100644
index e86867c771..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointDeserializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Endpoint;
-
-import java.util.List;
-
-public class EndpointDeserializer extends AbstractBanyanDBDeserializer<Endpoint> {
- public EndpointDeserializer() {
- super(EndpointTraffic.INDEX_NAME,
- ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID));
- }
-
- @Override
- public Endpoint map(RowEntity row) {
- Endpoint endpoint = new Endpoint();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- endpoint.setName((String) searchable.get(0).getValue());
- endpoint.setId((String) searchable.get(1).getValue());
- return endpoint;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java
deleted file mode 100644
index 39783228d2..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventDeserializer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.query.type.event.EventType;
-import org.apache.skywalking.oap.server.core.query.type.event.Source;
-import org.apache.skywalking.oap.server.core.source.Event;
-
-import java.util.List;
-
-public class EventDeserializer extends AbstractBanyanDBDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
- public EventDeserializer() {
- super(Event.INDEX_NAME,
- ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME,
- Event.TYPE, Event.START_TIME, Event.END_TIME),
- ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
- }
-
- @Override
- public org.apache.skywalking.oap.server.core.query.type.event.Event map(RowEntity row) {
- final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
- // searchable
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- resultEvent.setUuid((String) searchable.get(0).getValue());
- resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
- resultEvent.setName((String) searchable.get(4).getValue());
- resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
- resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
- // data
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- resultEvent.setMessage((String) data.get(0).getValue());
- resultEvent.setParameters((String) data.get(1).getValue());
- return resultEvent;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java
deleted file mode 100644
index f1bb3bec5d..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogDeserializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
-import org.apache.skywalking.apm.network.logging.v3.LogTags;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.query.type.KeyValue;
-import org.apache.skywalking.oap.server.core.query.type.Log;
-
-import java.util.List;
-
-public class LogDeserializer extends AbstractBanyanDBDeserializer<Log> {
- public LogDeserializer() {
- super(LogRecord.INDEX_NAME, ImmutableList.of(
- AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
- AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
- AbstractLogRecord.SPAN_ID),
- ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
- }
-
- @Override
- public Log map(RowEntity row) {
- Log log = new Log();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- log.setServiceId((String) searchable.get(0).getValue());
- log.setServiceInstanceId((String) searchable.get(1).getValue());
- log.setEndpointId((String) searchable.get(2).getValue());
- log.setTraceId((String) searchable.get(3).getValue());
- log.setTimestamp(row.getTimestamp());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
- log.setContent("");
- } else {
- try {
- // Don't read the tags as they have been in the data binary already.
- LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
- for (final KeyStringValuePair pair : logTags.getDataList()) {
- log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
- }
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
- return log;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java
deleted file mode 100644
index 6255a0ea40..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-
-import java.util.List;
-
-public class NetworkAddressAliasDeserializer extends AbstractBanyanDBDeserializer<NetworkAddressAlias> {
- public NetworkAddressAliasDeserializer() {
- super(NetworkAddressAlias.INDEX_NAME,
- ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
- ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
- }
-
- @Override
- public NetworkAddressAlias map(RowEntity row) {
- NetworkAddressAlias model = new NetworkAddressAlias();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // searchable - last_update_time_bucket
- model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // data - time_bucket
- model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
- // data - address
- model.setAddress((String) data.get(1).getValue());
- // data - represent_service_id
- model.setRepresentServiceId((String) data.get(2).getValue());
- // data - represent_service_instance_id
- model.setRepresentServiceInstanceId((String) data.get(3).getValue());
- return model;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java
deleted file mode 100644
index d3eddedec6..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskDeserializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-
-import java.util.List;
-
-public class ProfileTaskDeserializer extends AbstractBanyanDBDeserializer<ProfileTask> {
- public static final String ID = "profile_task_query_id";
-
- public ProfileTaskDeserializer() {
- super(ProfileTaskRecord.INDEX_NAME,
- ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
- ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT));
- }
-
- @Override
- public ProfileTask map(RowEntity row) {
- ProfileTask profileTask = new ProfileTask();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- profileTask.setId((String) searchable.get(0).getValue());
- profileTask.setServiceId((String) searchable.get(1).getValue());
- profileTask.setEndpointName((String) searchable.get(2).getValue());
- profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
- profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
- profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
- profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
- profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
- profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
- return null;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java
deleted file mode 100644
index 79731bf5fb..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
-
-import java.util.List;
-
-public class ProfileTaskLogDeserializer extends AbstractBanyanDBDeserializer<ProfileTaskLog> {
- public ProfileTaskLogDeserializer() {
- super(ProfileTaskLogRecord.INDEX_NAME,
- ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
- ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
- ProfileTaskLogRecord.OPERATION_TYPE));
- }
-
- @Override
- public ProfileTaskLog map(RowEntity row) {
- ProfileTaskLog profileTaskLog = new ProfileTaskLog();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- // searchable - operation_time
- profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // searchable - task_id
- profileTaskLog.setTaskId((String) data.get(0).getValue());
- // searchable - instance_id
- profileTaskLog.setInstanceId((String) data.get(1).getValue());
- // searchable - operation_type
- profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
- return profileTaskLog;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java
deleted file mode 100644
index be815f57c0..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordDeserializer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordDeserializer extends AbstractBanyanDBDeserializer<ProfileThreadSnapshotRecord> {
- public ProfileThreadSnapshotRecordDeserializer() {
- super(ProfileThreadSnapshotRecord.INDEX_NAME,
- ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
- Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
- }
-
- @Override
- public ProfileThreadSnapshotRecord map(RowEntity row) {
- ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- record.setTaskId((String) searchable.get(0).getValue());
- record.setSegmentId((String) searchable.get(1).getValue());
- record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
- record.setSequence(((Number) searchable.get(3).getValue()).intValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
- return record;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java
deleted file mode 100644
index 2e33ed2373..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityDeserializer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-
-@FunctionalInterface
-public interface RowEntityDeserializer<T> {
- T map(RowEntity row);
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java
deleted file mode 100644
index 0665d614af..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordDeserializer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.protobuf.ByteString;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class SegmentRecordDeserializer extends AbstractBanyanDBDeserializer<SegmentRecord> {
- public SegmentRecordDeserializer() {
- super(SegmentRecord.INDEX_NAME,
- ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
- Collections.singletonList("data_binary"));
- }
-
- @Override
- public SegmentRecord map(RowEntity row) {
- SegmentRecord record = new SegmentRecord();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- record.setSegmentId(row.getId());
- record.setTraceId((String) searchable.get(0).getValue());
- record.setIsError(((Number) searchable.get(1).getValue()).intValue());
- record.setServiceId((String) searchable.get(2).getValue());
- record.setServiceInstanceId((String) searchable.get(3).getValue());
- record.setEndpointId((String) searchable.get(4).getValue());
- record.setLatency(((Number) searchable.get(5).getValue()).intValue());
- record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
- return record;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java
deleted file mode 100644
index cf36fd9239..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceDeserializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.query.type.Service;
-
-import java.util.List;
-
-public class ServiceDeserializer extends AbstractBanyanDBDeserializer<Service> {
- public ServiceDeserializer() {
- super(ServiceTraffic.INDEX_NAME,
- ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP));
- }
-
- @Override
- public Service map(RowEntity row) {
- Service service = new Service();
- final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
- service.setId(row.getId());
- service.setName((String) searchable.get(0).getValue());
- service.setGroup((String) searchable.get(2).getValue());
- return service;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java
deleted file mode 100644
index a95867c42e..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceDeserializer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.JsonElement;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.query.enumeration.Language;
-import org.apache.skywalking.oap.server.core.query.type.Attribute;
-import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
-import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class ServiceInstanceDeserializer extends AbstractBanyanDBDeserializer<ServiceInstance> {
- public ServiceInstanceDeserializer() {
- super(InstanceTraffic.INDEX_NAME,
- ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
- Collections.singletonList("data_binary"));
- }
-
- @Override
- public ServiceInstance map(RowEntity row) {
- InstanceTraffic instanceTraffic = new InstanceTraffic();
- final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- Object o = data.get(0).getValue();
- ServiceInstance serviceInstance = new ServiceInstance();
- if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- try {
- RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
- instanceTraffic.deserialize(remoteData);
- serviceInstance.setName(instanceTraffic.getName());
- serviceInstance.setId(instanceTraffic.getServiceId());
-
- if (instanceTraffic.getProperties() != null) {
- for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
- String key = property.getKey();
- String value = property.getValue().getAsString();
- if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
- serviceInstance.setLanguage(Language.value(value));
- } else {
- serviceInstance.getAttributes().add(new Attribute(key, value));
- }
- }
- } else {
- serviceInstance.setLanguage(Language.UNKNOWN);
- }
- } catch (InvalidProtocolBufferException ex) {
- throw new RuntimeException("fail to parse remote data", ex);
- }
- } else {
- throw new RuntimeException("unable to parse binary data");
- }
-
- return serviceInstance;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
index b154c9280d..a8a2863306 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/AlarmRecordBuilder.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import java.util.ArrayList;
import java.util.List;
-public class AlarmRecordBuilder extends BanyanDBRecordBuilder<AlarmRecord> {
+public class AlarmRecordBuilder extends BanyanDBStorageDataBuilder<AlarmRecord> {
public static final List<String> INDEXED_TAGS = ImmutableList.of(
"level"
);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java
deleted file mode 100644
index 28c3055f47..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBMetricsBuilder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-public abstract class BanyanDBMetricsBuilder<T extends Metrics> extends BanyanDBStorageDataBuilder<T> {
- @Override
- protected long timestamp(Model model, T entity) {
- return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java
deleted file mode 100644
index 0310c7173b..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBRecordBuilder.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
-
-import org.apache.skywalking.banyandb.v1.Banyandb;
-import org.apache.skywalking.banyandb.v1.client.SerializableTag;
-import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
-import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
-import org.apache.skywalking.oap.server.core.analysis.record.Record;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public abstract class BanyanDBRecordBuilder<T extends Record> extends BanyanDBStorageDataBuilder<T> {
- @Override
- protected long timestamp(Model model, T entity) {
- return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
- }
-
- protected List<SerializableTag<Banyandb.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
- if (rawTags == null) {
- return Collections.emptyList();
- }
- Map<String, SerializableTag<Banyandb.TagValue>> map = new HashMap<>();
- for (final Tag tag : rawTags) {
- map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
- }
- final List<SerializableTag<Banyandb.TagValue>> tags = new ArrayList<>();
- for (String indexedTag : indexTags) {
- SerializableTag<Banyandb.TagValue> tag = map.get(indexedTag);
- if (tag == null) {
- tags.add(TagAndValue.nullField());
- } else {
- tags.add(tag);
- }
- }
-
- return tags;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
index 6d545f8a9e..03ee0328f3 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BanyanDBStorageDataBuilder.java
@@ -21,29 +21,65 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.schema;
import org.apache.skywalking.banyandb.v1.Banyandb;
import org.apache.skywalking.banyandb.v1.client.SerializableTag;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.storage.StorageData;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
-public abstract class BanyanDBStorageDataBuilder<T extends StorageData> {
- public StreamWrite entity2Storage(Model model, T entity) {
- return StreamWrite.builder()
+public abstract class BanyanDBStorageDataBuilder<T extends StorageData> implements StorageBuilder<T, StreamWrite.StreamWriteBuilder> {
+ @Override
+ public T storage2Entity(StreamWrite.StreamWriteBuilder storageData) {
+ return null;
+ }
+
+ @Override
+ public StreamWrite.StreamWriteBuilder entity2Storage(T entity) {
+ StreamWrite.StreamWriteBuilder b = StreamWrite.builder()
.elementId(this.elementID(entity))
- .name(model.getName())
- .timestamp(this.timestamp(model, entity))
.searchableTags(this.searchableTags(entity))
- .dataTags(this.dataTags(entity))
- .build();
+ .dataTags(this.dataTags(entity));
+ Long ts = this.extractTimestamp(entity);
+ if (ts != null) {
+ b.timestamp(ts);
+ }
+ return b;
+ }
+
+ protected Long extractTimestamp(T entity) {
+ return null;
+ }
+
+ protected List<SerializableTag<Banyandb.TagValue>> filterSearchableTags(List<Tag> rawTags, List<String> indexTags) {
+ if (rawTags == null) {
+ return Collections.emptyList();
+ }
+ Map<String, SerializableTag<Banyandb.TagValue>> map = new HashMap<>();
+ for (final Tag tag : rawTags) {
+ map.put(tag.getKey().toLowerCase(), TagAndValue.stringField(tag.getValue()));
+ }
+ final List<SerializableTag<Banyandb.TagValue>> tags = new ArrayList<>();
+ for (String indexedTag : indexTags) {
+ SerializableTag<Banyandb.TagValue> tag = map.get(indexedTag);
+ if (tag == null) {
+ tags.add(TagAndValue.nullField());
+ } else {
+ tags.add(tag);
+ }
+ }
+
+ return tags;
}
protected String elementID(T entity) {
return entity.id();
}
- abstract protected long timestamp(Model model, T entity);
-
abstract protected List<SerializableTag<Banyandb.TagValue>> searchableTags(T entity);
protected List<SerializableTag<Banyandb.TagValue>> dataTags(T entity) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
index 0d1923c6f0..98b9bd0e6b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/BrowserErrorLogRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class BrowserErrorLogRecordBuilder extends BanyanDBRecordBuilder<BrowserErrorLogRecord> {
+public class BrowserErrorLogRecordBuilder extends BanyanDBStorageDataBuilder<BrowserErrorLogRecord> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(BrowserErrorLogRecord entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>();
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
index cce48e8b71..9a99763c73 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/EventBuilder.java
@@ -4,14 +4,12 @@ import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.Banyandb;
import org.apache.skywalking.banyandb.v1.client.SerializableTag;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
-import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.source.Event;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
import java.util.ArrayList;
import java.util.List;
-public class EventBuilder extends BanyanDBMetricsBuilder<Event> {
+public class EventBuilder extends BanyanDBStorageDataBuilder<Event> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(Event entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(8);
@@ -26,11 +24,6 @@ public class EventBuilder extends BanyanDBMetricsBuilder<Event> {
return searchable;
}
- @Override
- protected long timestamp(Model model, Event entity) {
- return TimeBucket.getTimestamp(entity.getTimeBucket(), model.getDownsampling());
- }
-
@Override
protected List<SerializableTag<Banyandb.TagValue>> dataTags(Event entity) {
return ImmutableList.of(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
index 1d88799fd9..02809d68aa 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/LogRecordBuilder.java
@@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import java.util.ArrayList;
import java.util.List;
-public class LogRecordBuilder extends BanyanDBRecordBuilder<LogRecord> {
+public class LogRecordBuilder extends BanyanDBStorageDataBuilder<LogRecord> {
public static final List<String> INDEXED_TAGS = ImmutableList.of(
"level"
);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
index 1fcd5f1587..66f3218965 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/Metadata.java
@@ -30,7 +30,7 @@ import java.util.Collections;
import java.util.List;
public class Metadata {
- public static class ServiceTrafficBuilder extends BanyanDBMetricsBuilder<ServiceTraffic> {
+ public static class ServiceTrafficBuilder extends BanyanDBStorageDataBuilder<ServiceTraffic> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ServiceTraffic entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(3);
@@ -41,7 +41,7 @@ public class Metadata {
}
}
- public static class EndpointTrafficBuilder extends BanyanDBMetricsBuilder<EndpointTraffic> {
+ public static class EndpointTrafficBuilder extends BanyanDBStorageDataBuilder<EndpointTraffic> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(EndpointTraffic entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(2);
@@ -51,7 +51,7 @@ public class Metadata {
}
}
- public static class InstanceTrafficBuilder extends BanyanDBMetricsBuilder<InstanceTraffic> {
+ public static class InstanceTrafficBuilder extends BanyanDBStorageDataBuilder<InstanceTraffic> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(InstanceTraffic entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(2);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
index 1054bd922f..83bf0abfab 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/NetworkAddressAliasBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class NetworkAddressAliasBuilder extends BanyanDBMetricsBuilder<NetworkAddressAlias> {
+public class NetworkAddressAliasBuilder extends BanyanDBStorageDataBuilder<NetworkAddressAlias> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(NetworkAddressAlias entity) {
return Collections.singletonList(TagAndValue.longField(entity.getLastUpdateTimeBucket()));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
index ee0141dc6d..888cac6b10 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskLogRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class ProfileTaskLogRecordBuilder extends BanyanDBRecordBuilder<ProfileTaskLogRecord> {
+public class ProfileTaskLogRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskLogRecord> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileTaskLogRecord entity) {
return Collections.singletonList(TagAndValue.longField(entity.getOperationTime()));
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
index 0ce548ffc6..b107cadc31 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileTaskRecordBuilder.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import java.util.ArrayList;
import java.util.List;
-public class ProfileTaskRecordBuilder extends BanyanDBRecordBuilder<ProfileTaskRecord> {
+public class ProfileTaskRecordBuilder extends BanyanDBStorageDataBuilder<ProfileTaskRecord> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileTaskRecord entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(9);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
index 29f6a8b708..10032246ec 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/ProfileThreadSnapshotRecordBuilder.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class ProfileThreadSnapshotRecordBuilder extends BanyanDBRecordBuilder<ProfileThreadSnapshotRecord> {
+public class ProfileThreadSnapshotRecordBuilder extends BanyanDBStorageDataBuilder<ProfileThreadSnapshotRecord> {
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(ProfileThreadSnapshotRecord entity) {
List<SerializableTag<Banyandb.TagValue>> searchable = new ArrayList<>(4);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
index e93335643c..a87dc9aa86 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/SegmentRecordBuilder.java
@@ -28,7 +28,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class SegmentRecordBuilder extends BanyanDBRecordBuilder<SegmentRecord> {
+public class SegmentRecordBuilder extends BanyanDBStorageDataBuilder<SegmentRecord> {
public static final List<String> INDEXED_TAGS = ImmutableList.of(
"http.method",
"status_code",
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
index 1776e5af8d..41104ee588 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/schema/UITemplateBuilder.java
@@ -23,19 +23,11 @@ import org.apache.skywalking.banyandb.v1.Banyandb;
import org.apache.skywalking.banyandb.v1.client.SerializableTag;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.storage.model.Model;
import java.util.ArrayList;
import java.util.List;
public class UITemplateBuilder extends BanyanDBStorageDataBuilder<UITemplate> {
- public static final long UI_TEMPLATE_TIMESTAMP = 1L;
-
- @Override
- protected long timestamp(Model model, UITemplate entity) {
- return UI_TEMPLATE_TIMESTAMP;
- }
-
@Override
protected List<SerializableTag<Banyandb.TagValue>> searchableTags(UITemplate entity) {
return ImmutableList.of(
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 71d45dc7b6..0b89545ee4 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -18,42 +18,62 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AbstractBanyanDBDeserializer;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BanyanDBDeserializerFactory;
import java.util.List;
-import java.util.stream.Collectors;
+import java.util.function.Function;
public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
protected AbstractBanyanDBDAO(BanyanDBStorageClient client) {
super(client);
}
- protected <T> List<T> query(Class<T> clazz, QueryBuilder builder) {
- return this.query(clazz, builder, 0, 0);
+ protected StreamQueryResponse query(String indexName, List<String> searchableTags, QueryBuilder builder) {
+ return this.query(indexName, searchableTags, null, builder);
}
- protected <T> List<T> query(Class<T> clazz, QueryBuilder builder, long startTimestamp, long endTimestamp) {
- AbstractBanyanDBDeserializer<T> deserializer = BanyanDBDeserializerFactory.INSTANCE.findDeserializer(clazz);
-
+ protected StreamQueryResponse query(String indexName, List<String> searchableTags, TimestampRange timestampRange,
+ QueryBuilder builder) {
final StreamQuery query;
- if (startTimestamp != 0 && endTimestamp != 0) {
- query = deserializer.buildStreamQuery();
+ if (timestampRange == null) {
+ query = new StreamQuery(indexName, searchableTags);
} else {
- query = deserializer.buildStreamQuery(startTimestamp, endTimestamp);
+ query = new StreamQuery(indexName, timestampRange, searchableTags);
}
builder.apply(query);
- final StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(deserializer::map).collect(Collectors.toList());
+ return getClient().query(query);
+ }
+
+ protected abstract static class QueryBuilder {
+ protected static final String SEARCHABLE = "searchable";
+
+ abstract void apply(final StreamQuery query);
+
+ protected PairQueryCondition<Long> eq(String name, long value) {
+ return PairQueryCondition.LongQueryCondition.eq(SEARCHABLE, name, value);
+ }
+
+ protected PairQueryCondition<Long> lte(String name, long value) {
+ return PairQueryCondition.LongQueryCondition.le(SEARCHABLE, name, value);
+ }
+
+ protected PairQueryCondition<Long> gte(String name, long value) {
+ return PairQueryCondition.LongQueryCondition.ge(SEARCHABLE, name, value);
+ }
+
+ protected PairQueryCondition<String> eq(String name, String value) {
+ return PairQueryCondition.StringQueryCondition.eq(SEARCHABLE, name, value);
+ }
}
- interface QueryBuilder {
- void apply(final StreamQuery query);
+ interface RowEntityDeserializer<T> extends Function<RowEntity, T> {
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 999d89db79..08225e51e1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -18,13 +18,22 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.reflect.TypeToken;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -33,6 +42,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmReco
import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
@@ -45,34 +55,70 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm
@Override
public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
- List<AlarmMessage> messages = query(AlarmMessage.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- if (Objects.nonNull(scopeId)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
- }
- if (startTB != 0 && endTB != 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
- }
+ TimestampRange tsRange = null;
+ if (startTB > 0 && endTB > 0) {
+ tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
+ }
- // TODO: support keyword search
+ StreamQueryResponse resp = query(AlarmRecord.INDEX_NAME,
+ ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+ tsRange,
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
- if (CollectionUtils.isNotEmpty(tags)) {
- for (final Tag tag : tags) {
- if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ if (Objects.nonNull(scopeId)) {
+ query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId));
}
+
+ // TODO: support keyword search
+
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ if (AlarmRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
+ }
+ }
+ }
+ query.setLimit(limit);
+ query.setOffset(from);
}
- }
- query.setLimit(limit);
- query.setOffset(from);
- }
- });
+ });
+
+ List<AlarmMessage> messages = resp.getElements().stream().map(new AlarmMessageDeserializer())
+ .collect(Collectors.toList());
Alarms alarms = new Alarms();
alarms.setTotal(messages.size());
alarms.getMsgs().addAll(messages);
return alarms;
}
+
+ public static class AlarmMessageDeserializer implements RowEntityDeserializer<AlarmMessage> {
+ @Override
+ public AlarmMessage apply(RowEntity row) {
+ AlarmMessage alarmMessage = new AlarmMessage();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ int scopeID = ((Number) searchable.get(0).getValue()).intValue();
+ alarmMessage.setScopeId(scopeID);
+ alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
+ alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ alarmMessage.setId((String) data.get(0).getValue());
+ alarmMessage.setId1((String) data.get(1).getValue());
+ alarmMessage.setMessage((String) data.get(2).getValue());
+ Object o = data.get(3).getValue();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+ }
+ return alarmMessage;
+ }
+
+ void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+ List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
+ }.getType());
+ tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 803b05e255..c1bbc16941 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -18,20 +18,29 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
@@ -43,36 +52,71 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I
@Override
public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
+ TimestampRange tsRange = null;
+ if (startSecondTB > 0 && endSecondTB > 0) {
+ tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
+ }
- final QueryBuilder qb = new QueryBuilder() {
+ final BrowserErrorLogs logs = new BrowserErrorLogs();
+ StreamQueryResponse resp = query(BrowserErrorLogRecord.INDEX_NAME, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+ BrowserErrorLogRecord.SERVICE_VERSION_ID,
+ BrowserErrorLogRecord.PAGE_PATH_ID,
+ BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+ query.setDataProjections(Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
+ query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId));
if (StringUtil.isNotEmpty(serviceVersionId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+ query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
}
+
if (StringUtil.isNotEmpty(pagePathId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+ query.appendCondition(eq(BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
}
+
if (Objects.nonNull(category)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+ query.appendCondition(eq(BrowserErrorLogRecord.ERROR_CATEGORY, category.getValue()));
}
query.setOffset(from);
query.setLimit(limit);
}
- };
-
- final BrowserErrorLogs logs = new BrowserErrorLogs();
- final List<BrowserErrorLog> browserErrorLogs;
- if (startSecondTB != 0 && endSecondTB != 0) {
- browserErrorLogs = query(BrowserErrorLog.class, qb, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
- } else {
- browserErrorLogs = query(BrowserErrorLog.class, qb);
- }
- logs.getLogs().addAll(browserErrorLogs);
+ });
+ logs.getLogs().addAll(resp.getElements().stream().map(new BrowserErrorLogDeserializer()).collect(Collectors.toList()));
logs.setTotal(logs.getLogs().size());
return logs;
}
+
+ public static class BrowserErrorLogDeserializer implements RowEntityDeserializer<BrowserErrorLog> {
+ @Override
+ public BrowserErrorLog apply(RowEntity row) {
+ // FIXME: use protobuf directly
+ BrowserErrorLog log = new BrowserErrorLog();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ log.setService((String) searchable.get(0).getValue());
+ log.setServiceVersion((String) searchable.get(1).getValue());
+ log.setPagePath((String) searchable.get(2).getValue());
+ log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
+ log.setTime(row.getTimestamp());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ Object o = data.get(0).getValue();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ try {
+ org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+ .parseFrom((ByteString) o);
+ log.setGrade(browserErrorLog.getGrade());
+ log.setCol(browserErrorLog.getCol());
+ log.setLine(browserErrorLog.getLine());
+ log.setMessage(browserErrorLog.getMessage());
+ log.setErrorUrl(browserErrorLog.getErrorUrl());
+ log.setStack(browserErrorLog.getStack());
+ log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+ } catch (InvalidProtocolBufferException ex) {
+ throw new RuntimeException("fail to parse proto buffer", ex);
+ }
+ }
+ return log;
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
index b85e07d860..44299c38e9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -19,11 +19,15 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.query.PaginationUtils;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
+import org.apache.skywalking.oap.server.core.query.type.event.EventType;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
import org.apache.skywalking.oap.server.core.query.type.event.Source;
import org.apache.skywalking.oap.server.core.source.Event;
@@ -31,6 +35,7 @@ import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
@@ -42,10 +47,13 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
@Override
public Events queryEvents(EventQueryCondition condition) throws Exception {
- List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = query(org.apache.skywalking.oap.server.core.query.type.event.Event.class,
+ StreamQueryResponse resp = query(Event.INDEX_NAME,
+ ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME, Event.TYPE, Event.START_TIME, Event.END_TIME),
new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
+
buildConditions(condition, query);
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
@@ -59,8 +67,43 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
}
}
+
+ private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
+ if (!Strings.isNullOrEmpty(condition.getUuid())) {
+ query.appendCondition(eq(Event.UUID, condition.getUuid()));
+ }
+ final Source source = condition.getSource();
+ if (source != null) {
+ if (!Strings.isNullOrEmpty(source.getService())) {
+ query.appendCondition(eq(Event.SERVICE, source.getService()));
+ }
+ if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
+ query.appendCondition(eq(Event.SERVICE_INSTANCE, source.getServiceInstance()));
+ }
+ if (!Strings.isNullOrEmpty(source.getEndpoint())) {
+ query.appendCondition(eq(Event.ENDPOINT, source.getEndpoint()));
+ }
+ }
+ if (!Strings.isNullOrEmpty(condition.getName())) {
+ query.appendCondition(eq(Event.NAME, condition.getName()));
+ }
+ if (condition.getType() != null) {
+ query.appendCondition(eq(Event.TYPE, condition.getType().name()));
+ }
+ final Duration time = condition.getTime();
+ if (time != null) {
+ if (time.getStartTimestamp() > 0) {
+ query.appendCondition(gte(Event.START_TIME, time.getStartTimestamp()));
+ }
+ if (time.getEndTimestamp() > 0) {
+ query.appendCondition(lte(Event.END_TIME, time.getEndTimestamp()));
+ }
+ }
+ }
});
+ List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = resp.getElements().stream().map(new EventDeserializer()).collect(Collectors.toList());
+
Events events = new Events();
events.setEvents(eventList);
events.setTotal(eventList.size());
@@ -83,36 +126,23 @@ public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEvent
return events;
}
- private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
- if (!Strings.isNullOrEmpty(condition.getUuid())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.UUID, condition.getUuid()));
- }
- final Source source = condition.getSource();
- if (source != null) {
- if (!Strings.isNullOrEmpty(source.getService())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE, source.getService()));
- }
- if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE_INSTANCE, source.getServiceInstance()));
- }
- if (!Strings.isNullOrEmpty(source.getEndpoint())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.ENDPOINT, source.getEndpoint()));
- }
- }
- if (!Strings.isNullOrEmpty(condition.getName())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.NAME, condition.getName()));
- }
- if (condition.getType() != null) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.TYPE, condition.getType().name()));
- }
- final Duration time = condition.getTime();
- if (time != null) {
- if (time.getStartTimestamp() > 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.START_TIME, time.getStartTimestamp()));
- }
- if (time.getEndTimestamp() > 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.END_TIME, time.getEndTimestamp()));
- }
+ public static class EventDeserializer implements RowEntityDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
+ @Override
+ public org.apache.skywalking.oap.server.core.query.type.event.Event apply(RowEntity row) {
+ final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
+ // searchable
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ resultEvent.setUuid((String) searchable.get(0).getValue());
+ resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
+ resultEvent.setName((String) searchable.get(4).getValue());
+ resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
+ resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
+ // data
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ resultEvent.setMessage((String) data.get(0).getValue());
+ resultEvent.setParameters((String) data.get(1).getValue());
+ return resultEvent;
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 8325618f88..6bce5767e7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -18,13 +18,23 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
@@ -36,6 +46,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecord
import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
@@ -53,48 +64,84 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer
final QueryBuilder query = new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
+
if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+ query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+ query.appendCondition(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (StringUtil.isNotEmpty(endpointId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+ query.appendCondition(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (Objects.nonNull(relatedTrace)) {
if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+ query.appendCondition(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
}
if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+ query.appendCondition(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
}
if (Objects.nonNull(relatedTrace.getSpanId())) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+ query.appendCondition(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
}
}
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
if (LogRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
}
}
}
}
};
- final List<Log> entities;
- if (startTB != 0 && endTB != 0) {
- entities = query(Log.class, query, TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
- } else {
- entities = query(Log.class, query);
+ TimestampRange tsRange = null;
+ if (startTB > 0 && endTB > 0) {
+ tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
+
+ StreamQueryResponse resp = query(LogRecord.INDEX_NAME,
+ ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+ AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
+ AbstractLogRecord.SPAN_ID), tsRange, query);
+
+ List<Log> logEntities = resp.getElements().stream().map(new LogDeserializer()).collect(Collectors.toList());
+
Logs logs = new Logs();
- logs.getLogs().addAll(entities);
- logs.setTotal(entities.size());
+ logs.getLogs().addAll(logEntities);
+ logs.setTotal(logEntities.size());
return logs;
}
+
+ public static class LogDeserializer implements RowEntityDeserializer<Log> {
+ @Override
+ public Log apply(RowEntity row) {
+ Log log = new Log();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ log.setServiceId((String) searchable.get(0).getValue());
+ log.setServiceInstanceId((String) searchable.get(1).getValue());
+ log.setEndpointId((String) searchable.get(2).getValue());
+ log.setTraceId((String) searchable.get(3).getValue());
+ log.setTimestamp(row.getTimestamp());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
+ log.setContent("");
+ } else {
+ try {
+ // Don't read the tags as they have been in the data binary already.
+ LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
+ for (final KeyStringValuePair pair : logTags.getDataList()) {
+ log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return log;
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
index aecdd13dfd..7e37288428 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBManagementDAO.java
@@ -18,24 +18,54 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
import java.io.IOException;
+import java.util.Collections;
-@RequiredArgsConstructor
-public class BanyanDBManagementDAO<T extends ManagementData> implements IManagementDAO {
- private final BanyanDBStorageClient client;
+/**
+ * UITemplate insertion DAO
+ *
+ * @param <T> The only ManagementData we have now is {@link UITemplate}
+ */
+public class BanyanDBManagementDAO<T extends ManagementData> extends AbstractBanyanDBDAO implements IManagementDAO {
private final BanyanDBStorageDataBuilder<T> storageBuilder;
+ public BanyanDBManagementDAO(BanyanDBStorageClient client, BanyanDBStorageDataBuilder<T> storageBuilder) {
+ super(client);
+ this.storageBuilder = storageBuilder;
+ }
+
@Override
public void insert(Model model, ManagementData storageData) throws IOException {
- StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) storageData);
- client.write(streamWrite);
+ // ensure only insert once
+ StreamQueryResponse resp = query(UITemplate.INDEX_NAME,
+ Collections.singletonList(UITemplate.NAME),
+ new TimestampRange(0L, 2L),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(UITemplate.NAME, storageData.id()));
+ }
+ });
+
+ if (resp != null && resp.getElements().size() > 0) {
+ return;
+ }
+
+ StreamWrite.StreamWriteBuilder streamWrite = this.storageBuilder
+ .entity2Storage((T) storageData)
+ .name(model.getName())
+ .timestamp(1L);
+ getClient().write(streamWrite.build());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
index b3b6fece70..fdc1ccb71f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -18,23 +18,35 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
import org.apache.skywalking.oap.server.core.query.type.Database;
import org.apache.skywalking.oap.server.core.query.type.Endpoint;
import org.apache.skywalking.oap.server.core.query.type.Service;
import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -50,83 +62,184 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public List<Service> getAllServices(String group) throws IOException {
- return query(Service.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- if (StringUtil.isNotEmpty(group)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.GROUP, group));
- }
- }
- });
+ StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(group)) {
+ query.appendCondition(eq(ServiceTraffic.GROUP, group));
+ }
+ }
+ });
+
+ return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
}
@Override
public List<Service> getAllBrowserServices() throws IOException {
- return query(Service.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Browser.value()));
- }
- });
+ StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(ServiceTraffic.NODE_TYPE, NodeType.Browser.value()));
+ }
+ });
+
+ return resp.getElements().stream().map(new ServiceDeserializer()).collect(Collectors.toList());
}
@Override
public List<Database> getAllDatabases() throws IOException {
- return query(Database.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
- }
- });
+ StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME, ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
+ }
+ });
+
+ return resp.getElements().stream().map(new DatabaseDeserializer()).collect(Collectors.toList());
}
@Override
public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
- return query(Service.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
- }
- }).stream().filter(s -> s.getName().contains(keyword)) // TODO: support analyzer in database
+ StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(ServiceTraffic.NODE_TYPE, nodeType.value()));
+ }
+ });
+
+ return resp.getElements().stream().map(new ServiceDeserializer())
+ .filter(s -> s.getName().contains(keyword)) // TODO: support analyzer in database
.collect(Collectors.toList());
}
@Override
public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
- return query(Service.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
- // only get one
- query.setLimit(1);
- }
- }).stream().findAny().orElse(null);
+ StreamQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+ // only get one
+ query.setLimit(1);
+ }
+ });
+
+ return resp.getElements().stream().map(new ServiceDeserializer()).findAny().orElse(null);
}
@Override
public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
- return query(Endpoint.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", EndpointTraffic.SERVICE_ID, serviceId));
- }
- }).stream().filter(e -> e.getName().contains(keyword))
+ StreamQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
+ ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID), new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
+ }
+ });
+
+ return resp.getElements().stream().map(new EndpointDeserializer()).filter(e -> e.getName().contains(keyword))
.limit(limit).collect(Collectors.toList());
}
@Override
public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
- return query(ServiceInstance.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
- final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
-
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", InstanceTraffic.SERVICE_ID, serviceId));
+ StreamQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+ ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(Collections.singletonList("data_binary"));
+
+ final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+ final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
+
+ query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
+ query.appendCondition(lte(InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
+ query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
+ }
+ });
+
+ return resp.getElements().stream().map(new ServiceInstanceDeserializer()).collect(Collectors.toList());
+ }
+
+ public static class DatabaseDeserializer implements RowEntityDeserializer<Database> {
+ @Override
+ public Database apply(RowEntity row) {
+ Database database = new Database();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ database.setId(row.getId());
+ database.setName((String) searchable.get(0).getValue());
+ return database;
+ }
+ }
+
+ public static class EndpointDeserializer implements RowEntityDeserializer<Endpoint> {
+ @Override
+ public Endpoint apply(RowEntity row) {
+ Endpoint endpoint = new Endpoint();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ endpoint.setName((String) searchable.get(0).getValue());
+ endpoint.setId((String) searchable.get(1).getValue());
+ return endpoint;
+ }
+ }
+
+ public static class ServiceDeserializer implements RowEntityDeserializer<Service> {
+ @Override
+ public Service apply(RowEntity row) {
+ Service service = new Service();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ service.setId(row.getId());
+ service.setName((String) searchable.get(0).getValue());
+ service.setGroup((String) searchable.get(2).getValue());
+ return service;
+ }
+ }
+
+ public static class ServiceInstanceDeserializer implements RowEntityDeserializer<ServiceInstance> {
+ @Override
+ public ServiceInstance apply(RowEntity row) {
+ InstanceTraffic instanceTraffic = new InstanceTraffic();
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ Object o = data.get(0).getValue();
+ ServiceInstance serviceInstance = new ServiceInstance();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ try {
+ RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
+ instanceTraffic.deserialize(remoteData);
+ serviceInstance.setName(instanceTraffic.getName());
+ serviceInstance.setId(instanceTraffic.getServiceId());
+
+ if (instanceTraffic.getProperties() != null) {
+ for (Map.Entry<String, JsonElement> property : instanceTraffic.getProperties().entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+ serviceInstance.setLanguage(Language.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key, value));
+ }
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ } catch (InvalidProtocolBufferException ex) {
+ throw new RuntimeException("fail to parse remote data", ex);
+ }
+ } else {
+ throw new RuntimeException("unable to parse binary data");
}
- });
+
+ return serviceInstance;
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
index 618ff86af9..5aa1b7657d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetricsDAO.java
@@ -20,12 +20,13 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBMetricsBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
import java.io.IOException;
import java.util.Collections;
@@ -33,7 +34,7 @@ import java.util.List;
@RequiredArgsConstructor
public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
- private final BanyanDBMetricsBuilder<T> storageBuilder;
+ private final BanyanDBStorageDataBuilder<T> storageBuilder;
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
@@ -42,12 +43,15 @@ public class BanyanDBMetricsDAO<T extends Metrics> implements IMetricsDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
- StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) metrics);
- return new BanyanDBStreamInsertRequest(streamWrite);
+ StreamWrite.StreamWriteBuilder builder = this.storageBuilder.entity2Storage((T) metrics)
+ .name(model.getName())
+ .timestamp(TimeBucket.getTimeBucket(metrics.getTimeBucket(), model.getDownsampling()));
+ return new BanyanDBStreamInsertRequest(builder.build());
}
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
- return null;
+ return new UpdateRequest() {
+ };
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index ce36573617..33aec37bac 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -18,13 +18,18 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link NetworkAddressAlias} is a stream
@@ -36,11 +41,36 @@ public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implemen
@Override
public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
- return query(NetworkAddressAlias.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
- }
- });
+ StreamQueryResponse resp = query(NetworkAddressAlias.INDEX_NAME,
+ ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
+ query.appendCondition(gte(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+ }
+ });
+
+ return resp.getElements().stream().map(new NetworkAddressAliasDeserializer()).collect(Collectors.toList());
+ }
+
+ public static class NetworkAddressAliasDeserializer implements RowEntityDeserializer<NetworkAddressAlias> {
+ @Override
+ public NetworkAddressAlias apply(RowEntity row) {
+ NetworkAddressAlias model = new NetworkAddressAlias();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // searchable - last_update_time_bucket
+ model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // data - time_bucket
+ model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
+ // data - address
+ model.setAddress((String) data.get(1).getValue());
+ // data - represent_service_id
+ model.setRepresentServiceId((String) data.get(2).getValue());
+ // data - represent_service_instance_id
+ model.setRepresentServiceInstanceId((String) data.get(3).getValue());
+ return model;
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
index 30ed279432..e19ff9af98 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNoneStreamDAO.java
@@ -20,22 +20,32 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
import java.io.IOException;
+/**
+ * DAO for NoneStream, specifically ProfileTaskRecord
+ *
+ * @param <T> For NoneStream, we only have {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord}
+ */
@RequiredArgsConstructor
public class BanyanDBNoneStreamDAO<T extends NoneStream> implements INoneStreamDAO {
private final BanyanDBStorageClient client;
- private final BanyanDBRecordBuilder<T> storageBuilder;
+ private final BanyanDBStorageDataBuilder<T> storageBuilder;
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
- StreamWrite streamWrite = this.storageBuilder.entity2Storage(model, (T) noneStream);
- this.client.write(streamWrite);
+ final long timestamp = TimeBucket.getTimeBucket(noneStream.getTimeBucket(), model.getDownsampling());
+ StreamWrite.StreamWriteBuilder builder =
+ this.storageBuilder.entity2Storage((T) noneStream)
+ .name(model.getName())
+ .timestamp(timestamp);
+ this.client.write(builder.build());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 3a3c12e119..3a4910f3b5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -18,8 +18,14 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -41,12 +47,38 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
- return query(ProfileTaskLog.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
- }
- }).stream().sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+ StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
+ ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(ProfileTaskLogRecord.TASK_ID,
+ ProfileTaskLogRecord.INSTANCE_ID,
+ ProfileTaskLogRecord.OPERATION_TYPE));
+ query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
+ }
+ });
+
+ return resp.getElements().stream().map(new ProfileTaskLogDeserializer())
+ .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
.collect(Collectors.toList());
}
+
+ public static class ProfileTaskLogDeserializer implements RowEntityDeserializer<ProfileTaskLog> {
+ @Override
+ public ProfileTaskLog apply(RowEntity row) {
+ ProfileTaskLog profileTaskLog = new ProfileTaskLog();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // searchable - operation_time
+ profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // searchable - task_id
+ profileTaskLog.setTaskId((String) data.get(0).getValue());
+ // searchable - instance_id
+ profileTaskLog.setInstanceId((String) data.get(1).getValue());
+ // searchable - operation_type
+ profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
+ return profileTaskLog;
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 47c7c57a00..1cbaea95af 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -18,60 +18,66 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskDeserializer;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
*/
public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
+ public static final String ID = "profile_task_query_id";
+
public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
- return query(ProfileTask.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
- ProfileTaskRecord.SERVICE_ID, serviceId));
- }
-
- if (StringUtil.isNotEmpty(endpointName)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
- ProfileTaskRecord.ENDPOINT_NAME, endpointName));
- }
-
- if (Objects.nonNull(startTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
- ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
- }
-
- if (Objects.nonNull(endTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
- ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
- }
-
- if (Objects.nonNull(limit)) {
- query.setLimit(limit);
- }
-
- query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
- }
- });
+ StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
+ ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT), new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.appendCondition(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
+ }
+
+ if (StringUtil.isNotEmpty(endpointName)) {
+ query.appendCondition(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+ }
+
+ if (Objects.nonNull(startTimeBucket)) {
+ query.appendCondition(gte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+ }
+
+ if (Objects.nonNull(endTimeBucket)) {
+ query.appendCondition(lte(ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+ }
+
+ if (Objects.nonNull(limit)) {
+ query.setLimit(limit);
+ }
+
+ query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+ }
+ });
+
+ return resp.getElements().stream().map(new ProfileTaskDeserializer()).collect(Collectors.toList());
}
@Override
@@ -80,12 +86,36 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements
return null;
}
- return query(ProfileTask.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskDeserializer.ID, id));
- query.setLimit(1);
- }
- }).stream().findAny().orElse(null);
+ StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
+ ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(ID, id));
+ query.setLimit(1);
+ }
+ });
+
+ return resp.getElements().stream().map(new ProfileTaskDeserializer()).findAny().orElse(null);
+ }
+
+ public static class ProfileTaskDeserializer implements RowEntityDeserializer<ProfileTask> {
+ @Override
+ public ProfileTask apply(RowEntity row) {
+ ProfileTask profileTask = new ProfileTask();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ profileTask.setId((String) searchable.get(0).getValue());
+ profileTask.setServiceId((String) searchable.get(1).getValue());
+ profileTask.setEndpointName((String) searchable.get(2).getValue());
+ profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
+ profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
+ profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
+ profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
+ profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
+ profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
+ return profileTask;
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index c6b9cdec9f..4da87ac9e5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -18,8 +18,13 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
@@ -27,9 +32,9 @@ import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnaps
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -44,30 +49,37 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
- List<ProfileThreadSnapshotRecord> resp = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
- .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
- }
- });
-
- if (resp.isEmpty()) {
+ StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
+ .appendCondition(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+ }
+ });
+
+ if (resp.getElements().isEmpty()) {
return Collections.emptyList();
}
- final List<String> segmentIDs = resp.stream().map(ProfileThreadSnapshotRecord::getSegmentId).collect(Collectors.toList());
+ List<String> segmentIDs = resp.getElements().stream()
+ .map(new ProfileThreadSnapshotRecordDeserializer())
+ .map(ProfileThreadSnapshotRecord::getSegmentId)
+ .collect(Collectors.toList());
// TODO: support `IN` or `OR` logic operation in BanyanDB
- List<BasicTrace> basicTraces = new LinkedList<>();
+ List<BasicTrace> basicTraces = new ArrayList<>();
for (String segmentID : segmentIDs) {
- List<BasicTrace> subSet = query(BasicTrace.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery traceQuery) {
- traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
- }
- });
- basicTraces.addAll(subSet);
+ final StreamQueryResponse segmentRecordResp = query(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery traceQuery) {
+ traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID));
+ }
+ });
+ basicTraces.addAll(segmentRecordResp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList()));
}
// TODO: Sort in DB with DESC
@@ -92,35 +104,54 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
- return query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
- .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
- .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
- }
- });
+ StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
+
+ query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence))
+ .appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence));
+ }
+ });
+
+ return resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
}
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
- return query(SegmentRecord.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
- }
- }).stream().findFirst().orElse(null);
+ StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+ ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(Collections.singletonList("data_binary"));
+ query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId));
+ }
+ });
+
+ return resp.getElements().stream().map(new SegmentRecordDeserializer()).findFirst().orElse(null);
}
private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
- List<ProfileThreadSnapshotRecord> records = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
- .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
- .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
- }
- });
+ StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME,
+ ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
+
+ query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
+ .appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
+ }
+ });
+
+ List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(new ProfileThreadSnapshotRecordDeserializer()).collect(Collectors.toList());
switch (aggType) {
case MIN:
@@ -145,4 +176,55 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i
enum AggType {
MIN, MAX
}
+
+ public static class ProfileThreadSnapshotRecordDeserializer implements RowEntityDeserializer<ProfileThreadSnapshotRecord> {
+ @Override
+ public ProfileThreadSnapshotRecord apply(RowEntity row) {
+ ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ record.setTaskId((String) searchable.get(0).getValue());
+ record.setSegmentId((String) searchable.get(1).getValue());
+ record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
+ record.setSequence(((Number) searchable.get(3).getValue()).intValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
+ return record;
+ }
+ }
+
+ public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
+ @Override
+ public SegmentRecord apply(RowEntity row) {
+ SegmentRecord record = new SegmentRecord();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ record.setSegmentId(row.getId());
+ record.setTraceId((String) searchable.get(0).getValue());
+ record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+ record.setServiceId((String) searchable.get(2).getValue());
+ record.setServiceInstanceId((String) searchable.get(3).getValue());
+ record.setEndpointId((String) searchable.get(4).getValue());
+ record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+ record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
+ return record;
+ }
+ }
+
+ public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
+ @Override
+ public BasicTrace apply(RowEntity row) {
+ BasicTrace trace = new BasicTrace();
+ trace.setSegmentId(row.getId());
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ trace.getTraceIds().add((String) searchable.get(0).getValue());
+ trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+ trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ (String) searchable.get(2).getValue()
+ ).getEndpointName());
+ trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+ trace.setStart(String.valueOf(searchable.get(4).getValue()));
+ return trace;
+ }
+ }
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 0bd0e30a82..400731d83e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -20,22 +20,25 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBRecordBuilder;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
import java.io.IOException;
@RequiredArgsConstructor
public class BanyanDBRecordDAO<T extends Record> implements IRecordDAO {
- private final BanyanDBRecordBuilder<T> storageBuilder;
+ private final BanyanDBStorageDataBuilder<T> storageBuilder;
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
- StreamWrite streamWrite = storageBuilder.entity2Storage(model, (T) record);
+ StreamWrite.StreamWriteBuilder builder = storageBuilder.entity2Storage((T) record)
+ .name(model.getName())
+ .timestamp(TimeBucket.getTimeBucket(record.getTimeBucket(), model.getDownsampling()));
- return new BanyanDBStreamInsertRequest(streamWrite);
+ return new BanyanDBStreamInsertRequest(builder.build());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index 76c0359153..98f31ebd7e 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -19,19 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
-import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
-import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-import org.apache.skywalking.oap.server.core.source.Event;
+import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
+import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -40,19 +31,7 @@ import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.AlarmRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BrowserErrorLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.EventBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.LogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.Metadata;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.NetworkAddressAliasBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskLogRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileTaskRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.ProfileThreadSnapshotRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRecordBuilder;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
-
-import java.util.Map;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.BanyanDBStorageDataBuilder;
@Slf4j
public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> implements StorageDAO {
@@ -62,79 +41,21 @@ public class BanyanDBStorageDAO extends AbstractDAO<BanyanDBStorageClient> imple
@Override
public IMetricsDAO newMetricsDao(StorageBuilder storageBuilder) {
- try {
- Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
- if (Event.class.equals(returnType)) {
- return new BanyanDBMetricsDAO<>(new EventBuilder());
- } else if (ServiceTraffic.class.equals(returnType)) {
- return new BanyanDBMetricsDAO<>(new Metadata.ServiceTrafficBuilder());
- } else if (InstanceTraffic.class.equals(returnType)) {
- return new BanyanDBMetricsDAO<>(new Metadata.InstanceTrafficBuilder());
- } else if (EndpointTraffic.class.equals(returnType)) {
- return new BanyanDBMetricsDAO<>(new Metadata.EndpointTrafficBuilder());
- } else if (NetworkAddressAlias.class.equals(returnType)) {
- return new BanyanDBMetricsDAO<>(new NetworkAddressAliasBuilder());
- } else {
- throw new IllegalStateException("record type is not supported");
- }
- } catch (NoSuchMethodException noSuchMethodException) {
- log.error("cannot find method storage2Entity", noSuchMethodException);
- throw new RuntimeException("cannot find method storage2Entity");
- }
+ return new BanyanDBMetricsDAO<>((BanyanDBStorageDataBuilder<Metrics>) storageBuilder);
}
@Override
public IRecordDAO newRecordDao(StorageBuilder storageBuilder) {
- try {
- Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
- if (SegmentRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new SegmentRecordBuilder());
- } else if (AlarmRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new AlarmRecordBuilder());
- } else if (BrowserErrorLogRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new BrowserErrorLogRecordBuilder());
- } else if (LogRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new LogRecordBuilder());
- } else if (ProfileTaskLogRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new ProfileTaskLogRecordBuilder());
- } else if (ProfileThreadSnapshotRecord.class.equals(returnType)) {
- return new BanyanDBRecordDAO<>(new ProfileThreadSnapshotRecordBuilder());
- } else {
- throw new IllegalStateException("record type is not supported");
- }
- } catch (NoSuchMethodException noSuchMethodException) {
- log.error("cannot find method storage2Entity", noSuchMethodException);
- throw new RuntimeException("cannot find method storage2Entity");
- }
+ return new BanyanDBRecordDAO<>((BanyanDBStorageDataBuilder<Record>) storageBuilder);
}
@Override
public INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder) {
- try {
- Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
- if (ProfileTaskRecord.class.equals(returnType)) {
- return new BanyanDBNoneStreamDAO<>(getClient(), new ProfileTaskRecordBuilder());
- } else {
- throw new IllegalStateException("record type is not supported");
- }
- } catch (NoSuchMethodException noSuchMethodException) {
- log.error("cannot find method storage2Entity", noSuchMethodException);
- throw new RuntimeException("cannot find method storage2Entity");
- }
+ return new BanyanDBNoneStreamDAO<>(getClient(), (BanyanDBStorageDataBuilder<NoneStream>) storageBuilder);
}
@Override
public IManagementDAO newManagementDao(StorageBuilder storageBuilder) {
- try {
- Class<?> returnType = storageBuilder.getClass().getMethod("storage2Entity", Map.class).getReturnType();
- if (UITemplate.class.equals(returnType)) {
- return new BanyanDBManagementDAO<>(getClient(), new UITemplateBuilder());
- } else {
- throw new IllegalStateException("record type is not supported");
- }
- } catch (NoSuchMethodException noSuchMethodException) {
- log.error("cannot find method storage2Entity", noSuchMethodException);
- throw new RuntimeException("cannot find method storage2Entity");
- }
+ return new BanyanDBManagementDAO<>(getClient(), (BanyanDBStorageDataBuilder<ManagementData>) storageBuilder);
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 73e80c8f94..c5bb950bc6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,9 +19,15 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
import lombok.Getter;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
@@ -38,6 +44,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.SegmentRe
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
@@ -46,39 +53,39 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
- final QueryBuilder builder = new QueryBuilder() {
+ final QueryBuilder q = new QueryBuilder() {
@Override
public void apply(StreamQuery query) {
if (minDuration != 0) {
// duration >= minDuration
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+ query.appendCondition(gte("duration", minDuration));
}
if (maxDuration != 0) {
// duration <= maxDuration
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+ query.appendCondition(lte("duration", maxDuration));
}
if (!Strings.isNullOrEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+ query.appendCondition(eq("service_id", serviceId));
}
if (!Strings.isNullOrEmpty(serviceInstanceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+ query.appendCondition(eq("service_instance_id", serviceInstanceId));
}
if (!Strings.isNullOrEmpty(endpointId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+ query.appendCondition(eq("endpoint_id", endpointId));
}
switch (traceState) {
case ERROR:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.ERROR.getState()));
+ query.appendCondition(eq("state", TraceStateStorage.ERROR.getState()));
break;
case SUCCESS:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.SUCCESS.getState()));
+ query.appendCondition(eq("state", TraceStateStorage.SUCCESS.getState()));
break;
default:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) TraceStateStorage.ALL.getState()));
+ query.appendCondition(eq("state", TraceStateStorage.ALL.getState()));
break;
}
@@ -94,7 +101,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
if (SegmentRecordBuilder.INDEXED_TAGS.contains(tag.getKey())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ query.appendCondition(eq(tag.getKey(), tag.getValue()));
}
}
}
@@ -104,13 +111,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
}
};
- final List<BasicTrace> basicTraces;
- if (startSecondTB != 0 && endSecondTB != 0) {
- basicTraces = query(BasicTrace.class, builder, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
- } else {
- basicTraces = query(BasicTrace.class, builder);
+ TimestampRange tsRange = null;
+
+ if (startSecondTB > 0 && endSecondTB > 0) {
+ tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
}
+ StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+ ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"), tsRange, q);
+
+ List<BasicTrace> basicTraces = resp.getElements().stream().map(new BasicTraceDeserializer()).collect(Collectors.toList());
+
TraceBrief brief = new TraceBrief();
brief.setTotal(basicTraces.size());
brief.getTraces().addAll(basicTraces);
@@ -119,12 +130,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- return query(SegmentRecord.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
- }
- });
+ StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
+ ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(Collections.singletonList("data_binary"));
+ query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId));
+ }
+ });
+
+ return resp.getElements().stream().map(new SegmentRecordDeserializer()).collect(Collectors.toList());
}
@Override
@@ -142,4 +158,40 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace
this.state = state;
}
}
+
+ public static class BasicTraceDeserializer implements RowEntityDeserializer<BasicTrace> {
+ @Override
+ public BasicTrace apply(RowEntity row) {
+ BasicTrace trace = new BasicTrace();
+ trace.setSegmentId(row.getId());
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ trace.getTraceIds().add((String) searchable.get(0).getValue());
+ trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+ trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ (String) searchable.get(2).getValue()
+ ).getEndpointName());
+ trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+ trace.setStart(String.valueOf(searchable.get(4).getValue()));
+ return trace;
+ }
+ }
+
+ public static class SegmentRecordDeserializer implements RowEntityDeserializer<SegmentRecord> {
+ @Override
+ public SegmentRecord apply(RowEntity row) {
+ SegmentRecord record = new SegmentRecord();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ record.setSegmentId(row.getId());
+ record.setTraceId((String) searchable.get(0).getValue());
+ record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+ record.setServiceId((String) searchable.get(2).getValue());
+ record.setServiceInstanceId((String) searchable.get(3).getValue());
+ record.setEndpointId((String) searchable.get(4).getValue());
+ record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+ record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
+ return record;
+ }
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 9ed0af8447..1d9f977b0a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -18,21 +18,25 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.Tag;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.schema.UITemplateBuilder;
import java.io.IOException;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
@@ -44,15 +48,19 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
@Override
public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- return query(DashboardConfiguration.class, new QueryBuilder() {
- @Override
- public void apply(StreamQuery query) {
- query.setLimit(10000);
- if (!includingDisabled) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
- }
- }
- });
+ StreamQueryResponse resp = query(UITemplate.INDEX_NAME, ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setDataProjections(ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
+ query.setLimit(10000);
+ if (!includingDisabled) {
+ query.appendCondition(eq(UITemplate.DISABLED, BooleanUtils.FALSE));
+ }
+ }
+ });
+
+ return resp.getElements().stream().map(new DashboardConfigurationDeserializer()).collect(Collectors.toList());
}
@Override
@@ -71,7 +79,7 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
.dataTag(Tag.stringField(uiTemplate.getConfiguration()))
// data - activated
.dataTag(Tag.longField(uiTemplate.getActivated()))
- .timestamp(UITemplateBuilder.UI_TEMPLATE_TIMESTAMP)
+ .timestamp(1L)
.elementId(uiTemplate.id())
.build();
getClient().write(request);
@@ -87,4 +95,24 @@ public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO impleme
public TemplateChangeStatus disableTemplate(String name) throws IOException {
return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
}
+
+ public static class DashboardConfigurationDeserializer implements RowEntityDeserializer<DashboardConfiguration> {
+ @Override
+ public DashboardConfiguration apply(RowEntity row) {
+ DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // name
+ dashboardConfiguration.setName((String) searchable.get(0).getValue());
+ // disabled
+ dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // activated
+ dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
+ // configuration
+ dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
+ // type
+ dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
+ return dashboardConfiguration;
+ }
+ }
}