You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:24 UTC
[skywalking] 05/24: refactor deser
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 7fb9a6ad37e96be4fc4104ae6e022514683a0492
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Dec 2 22:13:57 2021 +0800
refactor deser
---
.../deserializer/AbstractBanyanDBDeserializer.java | 35 +++++
.../banyandb/deserializer/AlarmMessageMapper.java | 36 ++---
.../deserializer/BanyanDBDeserializerFactory.java | 54 +++++++
.../banyandb/deserializer/BasicTraceMapper.java | 18 +--
.../deserializer/BrowserErrorLogMapper.java | 24 ++-
.../deserializer/DashboardConfigurationMapper.java | 18 +--
.../banyandb/deserializer/DatabaseMapper.java | 25 ++++
.../banyandb/deserializer/EndpointMapper.java | 25 ++++
.../plugin/banyandb/deserializer/EventMapper.java | 37 +++--
.../plugin/banyandb/deserializer/LogMapper.java | 26 +---
.../deserializer/NetworkAddressAliasMapper.java | 15 +-
.../deserializer/ProfileTaskLogMapper.java | 16 +-
.../banyandb/deserializer/ProfileTaskMapper.java | 18 +--
.../ProfileThreadSnapshotRecordMapper.java | 16 +-
.../banyandb/deserializer/RowEntityMapper.java | 7 +-
.../banyandb/deserializer/SegmentRecordMapper.java | 18 +--
.../deserializer/ServiceInstanceMapper.java | 70 +++++++++
.../banyandb/deserializer/ServiceMapper.java | 26 ++++
.../banyandb/stream/AbstractBanyanDBDAO.java | 42 ++++++
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 49 +++----
.../stream/BanyanDBBrowserLogQueryDAO.java | 59 ++++----
.../banyandb/stream/BanyanDBEventQueryDAO.java | 86 ++++++++++-
.../banyandb/stream/BanyanDBLogQueryDAO.java | 71 ++++-----
.../banyandb/stream/BanyanDBMetadataQueryDAO.java | 77 ++++++++--
.../stream/BanyanDBNetworkAddressAliasDAO.java | 20 +--
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 19 +--
.../stream/BanyanDBProfileTaskQueryDAO.java | 69 +++++----
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 77 +++++-----
.../plugin/banyandb/stream/BanyanDBStorageDAO.java | 6 +-
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 161 +++++++++------------
.../stream/BanyanDBUITemplateManagementDAO.java | 26 ++--
31 files changed, 782 insertions(+), 464 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
new file mode 100644
index 0000000000..44adcb8869
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AbstractBanyanDBDeserializer.java
@@ -0,0 +1,35 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractBanyanDBDeserializer<T> implements RowEntityMapper<T> {
+ private final String indexName;
+ private final List<String> searchableProjection;
+ private final List<String> dataProjection;
+
+ protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection) {
+ this(indexName, searchableProjection, Collections.emptyList());
+ }
+
+ protected AbstractBanyanDBDeserializer(String indexName, List<String> searchableProjection, List<String> dataProjection) {
+ this.indexName = indexName;
+ this.searchableProjection = searchableProjection;
+ this.dataProjection = dataProjection;
+ }
+
+ public StreamQuery buildStreamQuery() {
+ final StreamQuery query = new StreamQuery(this.indexName, this.searchableProjection);
+ query.setDataProjections(this.dataProjection);
+ return query;
+ }
+
+ public StreamQuery buildStreamQuery(long startTimestamp, long endTimestamp) {
+ final StreamQuery query = new StreamQuery(this.indexName, new TimestampRange(startTimestamp, endTimestamp), this.searchableProjection);
+ query.setDataProjections(this.dataProjection);
+ return query;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
index d906dbb78f..939496e061 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
@@ -1,33 +1,27 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.ByteString;
-import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import java.util.List;
-@RequiredArgsConstructor
-public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
- private final IAlarmQueryDAO alarmQueryDAO;
+public class AlarmMessageMapper extends AbstractBanyanDBDeserializer<AlarmMessage> {
+ private final Gson GSON = new Gson();
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(AlarmRecord.SCOPE, // 0
- AlarmRecord.START_TIME); // 1
- }
-
- @Override
- public List<String> dataProjection() {
- return ImmutableList.of(AlarmRecord.ID0, // 0
- AlarmRecord.ID1, // 1
- AlarmRecord.ALARM_MESSAGE, // 2
- AlarmRecord.TAGS_RAW_DATA); // 3
+ public AlarmMessageMapper() {
+ super(AlarmRecord.INDEX_NAME,
+ ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME),
+ ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA));
}
@Override
@@ -44,8 +38,14 @@ public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
alarmMessage.setMessage((String) data.get(2).getValue());
Object o = data.get(3).getValue();
if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
- this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+ this.parseDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
}
return alarmMessage;
}
+
+ void parseDataBinary(byte[] dataBinary, List<KeyValue> tags) {
+ List<Tag> tagList = GSON.fromJson(new String(dataBinary, Charsets.UTF_8), new TypeToken<List<Tag>>() {
+ }.getType());
+ tagList.forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
new file mode 100644
index 0000000000..dd4af41736
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BanyanDBDeserializerFactory.java
@@ -0,0 +1,54 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+import org.apache.skywalking.oap.server.core.query.type.Log;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.query.type.event.Event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum BanyanDBDeserializerFactory {
+ INSTANCE;
+
+ private final Map<Class<?>, AbstractBanyanDBDeserializer<?>> registry;
+
+ BanyanDBDeserializerFactory() {
+ registry = new HashMap<>(10);
+ register(AlarmMessage.class, new AlarmMessageMapper());
+ register(BasicTrace.class, new BasicTraceMapper());
+ register(BrowserErrorLog.class, new BrowserErrorLogMapper());
+ register(DashboardConfiguration.class, new DashboardConfigurationMapper());
+ register(Database.class, new DatabaseMapper());
+ register(Endpoint.class, new EndpointMapper());
+ register(Event.class, new EventMapper());
+ register(Log.class, new LogMapper());
+ register(NetworkAddressAlias.class, new NetworkAddressAliasMapper());
+ register(ProfileTaskLog.class, new ProfileTaskLogMapper());
+ register(ProfileTask.class, new ProfileTaskMapper());
+ register(ProfileThreadSnapshotRecord.class, new ProfileThreadSnapshotRecordMapper());
+ register(SegmentRecord.class, new SegmentRecordMapper());
+ register(ServiceInstance.class, new ServiceInstanceMapper());
+ register(Service.class, new ServiceMapper());
+ }
+
+ private <T> void register(Class<T> clazz, AbstractBanyanDBDeserializer<T> mapper) {
+ this.registry.put(clazz, mapper);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public <T> AbstractBanyanDBDeserializer<T> findDeserializer(Class<T> clazz) {
+ return (AbstractBanyanDBDeserializer<T>) registry.get(clazz);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
index bbcf7091d0..5d7ae36590 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
@@ -4,12 +4,16 @@ import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import java.util.Collections;
import java.util.List;
-public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
+public class BasicTraceMapper extends AbstractBanyanDBDeserializer<BasicTrace> {
+ public BasicTraceMapper() {
+ super(SegmentRecord.INDEX_NAME, ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"));
+ }
+
@Override
public BasicTrace map(RowEntity row) {
BasicTrace trace = new BasicTrace();
@@ -24,14 +28,4 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
trace.setStart(String.valueOf(searchable.get(4).getValue()));
return trace;
}
-
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.emptyList();
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
index 0807955376..0189a77049 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
@@ -9,22 +9,18 @@ import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErro
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
+import java.util.Collections;
import java.util.List;
-public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
- BrowserErrorLogRecord.SERVICE_VERSION_ID,
- BrowserErrorLogRecord.PAGE_PATH_ID,
- BrowserErrorLogRecord.ERROR_CATEGORY,
- BrowserErrorLogRecord.TIMESTAMP
- );
- }
-
- @Override
- public List<String> dataProjection() {
- return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY);
+public class BrowserErrorLogMapper extends AbstractBanyanDBDeserializer<BrowserErrorLog> {
+ public BrowserErrorLogMapper() {
+ super(BrowserErrorLogRecord.INDEX_NAME,
+ ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+ BrowserErrorLogRecord.SERVICE_VERSION_ID,
+ BrowserErrorLogRecord.PAGE_PATH_ID,
+ BrowserErrorLogRecord.ERROR_CATEGORY,
+ BrowserErrorLogRecord.TIMESTAMP),
+ Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
index 6b9e0962e6..cb6b4f3c2f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
@@ -10,7 +10,13 @@ import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import java.util.List;
-public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> {
+public class DashboardConfigurationMapper extends AbstractBanyanDBDeserializer<DashboardConfiguration> {
+ public DashboardConfigurationMapper() {
+ super(UITemplate.INDEX_NAME,
+ ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED),
+ ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE));
+ }
+
@Override
public DashboardConfiguration map(RowEntity row) {
DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
@@ -28,14 +34,4 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo
dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
return dashboardConfiguration;
}
-
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED);
- }
-
- @Override
- public List<String> dataProjection() {
- return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE);
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java
new file mode 100644
index 0000000000..1c5a15f774
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DatabaseMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Database;
+
+import java.util.List;
+
+public class DatabaseMapper extends AbstractBanyanDBDeserializer<Database> {
+ public DatabaseMapper() {
+ super(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE));
+ }
+
+ @Override
+ public Database map(RowEntity row) {
+ Database database = new Database();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ database.setId(row.getId());
+ database.setName((String) searchable.get(0).getValue());
+ return database;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java
new file mode 100644
index 0000000000..8f81df039c
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EndpointMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Endpoint;
+
+import java.util.List;
+
+public class EndpointMapper extends AbstractBanyanDBDeserializer<Endpoint> {
+ public EndpointMapper() {
+ super(EndpointTraffic.INDEX_NAME,
+ ImmutableList.of(EndpointTraffic.NAME, EndpointTraffic.SERVICE_ID));
+ }
+
+ @Override
+ public Endpoint map(RowEntity row) {
+ Endpoint endpoint = new Endpoint();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ endpoint.setName((String) searchable.get(0).getValue());
+ endpoint.setId((String) searchable.get(1).getValue());
+ return endpoint;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
index 1c94903cbd..63c9dc802c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
@@ -2,23 +2,36 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.query.type.event.Event;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.query.type.event.EventType;
+import org.apache.skywalking.oap.server.core.query.type.event.Source;
+import org.apache.skywalking.oap.server.core.source.Event;
import java.util.List;
-public class EventMapper implements RowEntityMapper<Event> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of();
- }
-
- @Override
- public List<String> dataProjection() {
- return null;
+public class EventMapper extends AbstractBanyanDBDeserializer<org.apache.skywalking.oap.server.core.query.type.event.Event> {
+ public EventMapper() {
+ super(Event.INDEX_NAME,
+ ImmutableList.of(Event.UUID, Event.SERVICE, Event.SERVICE_INSTANCE, Event.ENDPOINT, Event.NAME,
+ Event.TYPE, Event.START_TIME, Event.END_TIME),
+ ImmutableList.of(Event.MESSAGE, Event.PARAMETERS));
}
@Override
- public Event map(RowEntity row) {
- return null;
+ public org.apache.skywalking.oap.server.core.query.type.event.Event map(RowEntity row) {
+ final org.apache.skywalking.oap.server.core.query.type.event.Event resultEvent = new org.apache.skywalking.oap.server.core.query.type.event.Event();
+ // searchable
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ resultEvent.setUuid((String) searchable.get(0).getValue());
+ resultEvent.setSource(new Source((String) searchable.get(1).getValue(), (String) searchable.get(2).getValue(), (String) searchable.get(3).getValue()));
+ resultEvent.setName((String) searchable.get(4).getValue());
+ resultEvent.setType(EventType.parse((String) searchable.get(5).getValue()));
+ resultEvent.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ resultEvent.setEndTime(((Number) searchable.get(7).getValue()).longValue());
+ // data
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ resultEvent.setMessage((String) data.get(0).getValue());
+ resultEvent.setParameters((String) data.get(1).getValue());
+ return resultEvent;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
index f1c5aaf442..3f65a82cea 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
@@ -8,29 +8,19 @@ import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.Log;
import java.util.List;
-public class LogMapper implements RowEntityMapper<Log> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(
- AbstractLogRecord.SERVICE_ID, // 0
- AbstractLogRecord.SERVICE_INSTANCE_ID, // 1
- AbstractLogRecord.ENDPOINT_ID, // 2
- AbstractLogRecord.TRACE_ID, // 3
- AbstractLogRecord.TRACE_SEGMENT_ID,
- AbstractLogRecord.SPAN_ID,
- AbstractLogRecord.TIMESTAMP); // 6
- }
-
- @Override
- public List<String> dataProjection() {
- return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE,
- AbstractLogRecord.CONTENT,
- AbstractLogRecord.TAGS_RAW_DATA); // 2
+public class LogMapper extends AbstractBanyanDBDeserializer<Log> {
+ public LogMapper() {
+ super(LogRecord.INDEX_NAME, ImmutableList.of(
+ AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID,
+ AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
+ AbstractLogRecord.SPAN_ID, AbstractLogRecord.TIMESTAMP),
+ ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
index aeb823c752..7c231df86a 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
@@ -8,16 +8,11 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import java.util.List;
-public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET);
- }
-
- @Override
- public List<String> dataProjection() {
- // TODO: make these static fields public
- return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id");
+public class NetworkAddressAliasMapper extends AbstractBanyanDBDeserializer<NetworkAddressAlias> {
+ public NetworkAddressAliasMapper() {
+ super(NetworkAddressAlias.INDEX_NAME,
+ ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET),
+ ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id"));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
index bc157253f0..de51d2f95c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
@@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationT
import java.util.List;
-public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME);
- }
-
- @Override
- public List<String> dataProjection() {
- return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
- ProfileTaskLogRecord.OPERATION_TYPE);
+public class ProfileTaskLogMapper extends AbstractBanyanDBDeserializer<ProfileTaskLog> {
+ public ProfileTaskLogMapper() {
+ super(ProfileTaskLogRecord.INDEX_NAME,
+ ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME),
+ ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
+ ProfileTaskLogRecord.OPERATION_TYPE));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
index 8a49404d23..2626a91c7d 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
@@ -6,22 +6,16 @@ import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import java.util.Collections;
import java.util.List;
-public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+public class ProfileTaskMapper extends AbstractBanyanDBDeserializer<ProfileTask> {
public static final String ID = "profile_task_query_id";
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
- ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
- ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.emptyList();
+ public ProfileTaskMapper() {
+ super(ProfileTaskRecord.INDEX_NAME,
+ ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
index 99f23117f2..2d966a5955 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
@@ -9,16 +9,12 @@ import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord
import java.util.Collections;
import java.util.List;
-public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE);
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY);
+public class ProfileThreadSnapshotRecordMapper extends AbstractBanyanDBDeserializer<ProfileThreadSnapshotRecord> {
+ public ProfileThreadSnapshotRecordMapper() {
+ super(ProfileThreadSnapshotRecord.INDEX_NAME,
+ ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE),
+ Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
index cc1d48d94f..51f9a5d687 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
@@ -2,12 +2,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import java.util.List;
-
+@FunctionalInterface
public interface RowEntityMapper<T> {
- List<String> searchableProjection();
-
- List<String> dataProjection();
-
T map(RowEntity row);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
index 70318136f6..da7be9de35 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
@@ -9,7 +9,13 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
import java.util.Collections;
import java.util.List;
-public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
+public class SegmentRecordMapper extends AbstractBanyanDBDeserializer<SegmentRecord> {
+ public SegmentRecordMapper() {
+ super(SegmentRecord.INDEX_NAME,
+ ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"),
+ Collections.singletonList("data_binary"));
+ }
+
@Override
public SegmentRecord map(RowEntity row) {
SegmentRecord record = new SegmentRecord();
@@ -26,14 +32,4 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
return record;
}
-
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.singletonList("data_binary");
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java
new file mode 100644
index 0000000000..f6925d3fd3
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceInstanceMapper.java
@@ -0,0 +1,70 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ServiceInstanceMapper extends AbstractBanyanDBDeserializer<ServiceInstance> {
+ private static final Gson GSON = new Gson();
+
+ public ServiceInstanceMapper() {
+ super(InstanceTraffic.INDEX_NAME,
+ ImmutableList.of(InstanceTraffic.SERVICE_ID, InstanceTraffic.LAST_PING_TIME_BUCKET),
+ Collections.singletonList("data_binary"));
+ }
+
+ @Override
+ public ServiceInstance map(RowEntity row) {
+ ServiceInstance serviceInstance = new ServiceInstance();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ serviceInstance.setId((String) searchable.get(0).getValue());
+ serviceInstance.setInstanceUUID((String) searchable.get(0).getValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ Object o = data.get(0).getValue();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ try {
+ RemoteData remoteData = RemoteData.parseFrom((ByteString) o);
+ serviceInstance.setName(remoteData.getDataStrings(1));
+ final String propString = remoteData.getDataStrings(2);
+ if (StringUtil.isNotEmpty(propString)) {
+ JsonObject properties = GSON.fromJson(propString, JsonObject.class);
+ if (properties != null) {
+ for (Map.Entry<String, JsonElement> property : properties.entrySet()) {
+ String key = property.getKey();
+ String value = property.getValue().getAsString();
+ if (key.equals(InstanceTraffic.PropertyUtil.LANGUAGE)) {
+ serviceInstance.setLanguage(Language.value(value));
+ } else {
+ serviceInstance.getAttributes().add(new Attribute(key, value));
+ }
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ } catch (InvalidProtocolBufferException ex) {
+ throw new RuntimeException("fail to parse remote data", ex);
+ }
+ } else {
+ serviceInstance.setLanguage(Language.UNKNOWN);
+ }
+ return serviceInstance;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java
new file mode 100644
index 0000000000..e006a1107d
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ServiceMapper.java
@@ -0,0 +1,26 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.type.Service;
+
+import java.util.List;
+
+public class ServiceMapper extends AbstractBanyanDBDeserializer<Service> {
+ public ServiceMapper() {
+ super(ServiceTraffic.INDEX_NAME,
+ ImmutableList.of(ServiceTraffic.NAME, ServiceTraffic.NODE_TYPE, ServiceTraffic.GROUP));
+ }
+
+ @Override
+ public Service map(RowEntity row) {
+ Service service = new Service();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ service.setId(row.getId());
+ service.setName((String) searchable.get(0).getValue());
+ service.setGroup((String) searchable.get(2).getValue());
+ return service;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
new file mode 100644
index 0000000000..b11cca6ed4
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -0,0 +1,42 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AbstractBanyanDBDeserializer;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BanyanDBDeserializerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> {
+ protected AbstractBanyanDBDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
+ protected <T> List<T> query(Class<T> clazz, QueryBuilder builder) {
+ return this.query(clazz, builder, 0, 0);
+ }
+
+ protected <T> List<T> query(Class<T> clazz, QueryBuilder builder, long startTimestamp, long endTimestamp) {
+ AbstractBanyanDBDeserializer<T> deserializer = BanyanDBDeserializerFactory.INSTANCE.findDeserializer(clazz);
+
+ final StreamQuery query;
+ if (startTimestamp != 0 && endTimestamp != 0) {
+ query = deserializer.buildStreamQuery();
+ } else {
+ query = deserializer.buildStreamQuery(startTimestamp, endTimestamp);
+ }
+
+ builder.apply(query);
+
+ final StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(deserializer::map).collect(Collectors.toList());
+ }
+
+
+ interface QueryBuilder {
+ void apply(final StreamQuery query);
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 7e122611bf..57f658a380 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -2,57 +2,48 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
* which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
*/
-public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO {
- private final RowEntityMapper<AlarmMessage> mapper;
-
+public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO {
public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
super(client);
- mapper = new AlarmMessageMapper(this);
}
@Override
public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
- final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection());
-
- if (Objects.nonNull(scopeId)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
- }
- if (startTB != 0 && endTB != 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
- }
-
- // TODO: support keyword search
-
- // TODO: support tag search
-
- query.setLimit(limit);
- query.setOffset(from);
-
- StreamQueryResponse resp = getClient().query(query);
-
- List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList());
+ List<AlarmMessage> messages = query(AlarmMessage.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (Objects.nonNull(scopeId)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
+ }
+ if (startTB != 0 && endTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
+ }
+
+ // TODO: support keyword search
+
+ // TODO: support tag search
+
+ query.setLimit(limit);
+ query.setOffset(from);
+ }
+ });
Alarms alarms = new Alarms();
alarms.setTotal(messages.size());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 25f2c27f4b..3bd3af8ad5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -2,61 +2,54 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
+import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
*/
-public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO {
- private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper();
-
+public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO {
public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
- final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.setDataProjections(MAPPER.dataProjection());
-
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
-
- if (startSecondTB != 0 && endSecondTB != 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
- }
- if (StringUtil.isNotEmpty(serviceVersionId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
- }
- if (StringUtil.isNotEmpty(pagePathId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
- }
- if (Objects.nonNull(category)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
- }
-
- query.setOffset(from);
- query.setLimit(limit);
-
- final StreamQueryResponse resp = getClient().query(query);
-
final BrowserErrorLogs logs = new BrowserErrorLogs();
- logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()));
+ List<BrowserErrorLog> browserErrorLogs = query(BrowserErrorLog.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
+ }
+ if (StringUtil.isNotEmpty(serviceVersionId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+ }
+ if (StringUtil.isNotEmpty(pagePathId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+ }
+ if (Objects.nonNull(category)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+ }
+
+ query.setOffset(from);
+ query.setLimit(limit);
+ }
+ });
+ logs.getLogs().addAll(browserErrorLogs);
logs.setTotal(logs.getLogs().size());
return logs;
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
index 5ab67d95c6..9b2f0d2be5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEventQueryDAO.java
@@ -1,23 +1,101 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import com.google.common.base.Strings;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.oap.server.core.query.PaginationUtils;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.event.EventQueryCondition;
import org.apache.skywalking.oap.server.core.query.type.event.Events;
+import org.apache.skywalking.oap.server.core.query.type.event.Source;
+import org.apache.skywalking.oap.server.core.source.Event;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.util.List;
/**
- * ???
* {@link org.apache.skywalking.oap.server.core.source.Event} is a stream
*/
-public class BanyanDBEventQueryDAO implements IEventQueryDAO {
+public class BanyanDBEventQueryDAO extends AbstractBanyanDBDAO implements IEventQueryDAO {
+ public BanyanDBEventQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public Events queryEvents(EventQueryCondition condition) throws Exception {
- return new Events();
+ List<org.apache.skywalking.oap.server.core.query.type.event.Event> eventList = query(org.apache.skywalking.oap.server.core.query.type.event.Event.class,
+ new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ buildConditions(condition, query);
+
+ PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
+ query.setLimit(page.getLimit());
+ query.setOffset(page.getFrom());
+ switch (condition.getOrder()) {
+ case ASC:
+ query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.ASC));
+ break;
+ case DES:
+ query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+ }
+ }
+ });
+
+ Events events = new Events();
+ events.setEvents(eventList);
+ // TODO: how to set total???
+ events.setTotal(eventList.size());
+ return events;
}
@Override
public Events queryEvents(List<EventQueryCondition> conditionList) throws Exception {
- return new Events();
+ Events events = new Events();
+ for (final EventQueryCondition condition : conditionList) {
+ Events subEvents = this.queryEvents(condition);
+ if (subEvents.getEvents().size() == 0) {
+ continue;
+ }
+
+ events.getEvents().addAll(subEvents.getEvents());
+ events.setTotal(events.getTotal() + subEvents.getTotal());
+ }
+
+ return events;
+ }
+
+ private void buildConditions(EventQueryCondition condition, final StreamQuery query) {
+ if (!Strings.isNullOrEmpty(condition.getUuid())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.UUID, condition.getUuid()));
+ }
+ final Source source = condition.getSource();
+ if (source != null) {
+ if (!Strings.isNullOrEmpty(source.getService())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE, source.getService()));
+ }
+ if (!Strings.isNullOrEmpty(source.getServiceInstance())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.SERVICE_INSTANCE, source.getServiceInstance()));
+ }
+ if (!Strings.isNullOrEmpty(source.getEndpoint())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.ENDPOINT, source.getEndpoint()));
+ }
+ }
+ if (!Strings.isNullOrEmpty(condition.getName())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.NAME, condition.getName()));
+ }
+ if (condition.getType() != null) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", Event.TYPE, condition.getType().name()));
+ }
+ final Duration time = condition.getTime();
+ if (time != null) {
+ if (time.getStartTimestamp() > 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.START_TIME, time.getStartTimestamp()));
+ }
+ if (time.getEndTimestamp() > 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.gt("searchable", Event.END_TIME, time.getEndTimestamp()));
+ }
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 661f65b930..404d60017c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -2,33 +2,25 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
-import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
*/
-public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO {
- private static final RowEntityMapper<Log> MAPPER = new LogMapper();
-
+public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -38,45 +30,44 @@ public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> impl
TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent,
List<String> excludingKeywordsOfContent) throws IOException {
- final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection());
- if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
- }
+ List<Log> entities = query(Log.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+ }
- if (startTB != 0 && endTB != 0) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
- }
+ if (startTB != 0 && endTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
+ }
- if (StringUtil.isNotEmpty(serviceInstanceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
- }
- if (StringUtil.isNotEmpty(endpointId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
- }
- if (Objects.nonNull(relatedTrace)) {
- if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
- }
- if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
- }
- if (Objects.nonNull(relatedTrace.getSpanId())) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
- }
- }
+ if (StringUtil.isNotEmpty(serviceInstanceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+ }
+ if (StringUtil.isNotEmpty(endpointId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+ }
+ if (Objects.nonNull(relatedTrace)) {
+ if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+ }
+ if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+ }
+ if (Objects.nonNull(relatedTrace.getSpanId())) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+ }
+ }
- // TODO: if we allow to index tags?
+ // TODO: if we allow to index tags?
// if (CollectionUtils.isNotEmpty(tags)) {
// for (final Tag tag : tags) {
// query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
// }
// }
-
- StreamQueryResponse resp = getClient().query(query);
-
- List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
-
+ }
+ });
Logs logs = new Logs();
logs.getLogs().addAll(entities);
logs.setTotal(entities.size());
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
index 678b981f16..dd3d9623a6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBMetadataQueryDAO.java
@@ -1,15 +1,23 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
import org.apache.skywalking.oap.server.core.query.type.Database;
import org.apache.skywalking.oap.server.core.query.type.Endpoint;
import org.apache.skywalking.oap.server.core.query.type.Service;
import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic},
@@ -17,39 +25,90 @@ import java.util.List;
* {@link org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic}
* are all streams.
*/
-public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO {
+public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMetadataQueryDAO {
+ public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public List<Service> getAllServices(String group) throws IOException {
- return Collections.emptyList();
+ return query(Service.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(group)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.GROUP, group));
+ }
+ }
+ });
}
@Override
public List<Service> getAllBrowserServices() throws IOException {
- return Collections.emptyList();
+ return query(Service.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Browser.value()));
+ }
+ });
}
@Override
public List<Database> getAllDatabases() throws IOException {
- return Collections.emptyList();
+ return query(Database.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) NodeType.Database.value()));
+ }
+ });
}
@Override
public List<Service> searchServices(NodeType nodeType, String keyword) throws IOException {
- return null;
+ return query(Service.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+ }
+ }).stream().filter((s) -> s.getName().contains(keyword)) // TODO: support analyzer in database
+ .collect(Collectors.toList());
}
@Override
public Service searchService(NodeType nodeType, String serviceCode) throws IOException {
- return null;
+ return query(Service.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ServiceTraffic.NAME, serviceCode));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ServiceTraffic.NODE_TYPE, (long) nodeType.value()));
+ // only get one
+ query.setLimit(1);
+ }
+ }).stream().findAny().orElse(null);
}
@Override
public List<Endpoint> searchEndpoint(String keyword, String serviceId, int limit) throws IOException {
- return Collections.emptyList();
+ return query(Endpoint.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", EndpointTraffic.SERVICE_ID, serviceId));
+ }
+ }).stream().filter((e) -> e.getName().contains(keyword))
+ .limit(limit).collect(Collectors.toList());
}
@Override
public List<ServiceInstance> getServiceInstances(long startTimestamp, long endTimestamp, String serviceId) throws IOException {
- return Collections.emptyList();
+ return query(ServiceInstance.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp);
+ final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(endTimestamp);
+
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, startMinuteTimeBucket));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", InstanceTraffic.LAST_PING_TIME_BUCKET, endMinuteTimeBucket));
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", InstanceTraffic.SERVICE_ID, serviceId));
+ }
+ });
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index 337c288e96..6112593a55 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -2,33 +2,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.util.List;
-import java.util.stream.Collectors;
/**
* {@link NetworkAddressAlias} is a stream
*/
-public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO {
- private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper();
-
+public class BanyanDBNetworkAddressAliasDAO extends AbstractBanyanDBDAO implements INetworkAddressAliasDAO {
public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
- StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
-
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+ return query(NetworkAddressAlias.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+ }
+ });
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 6c19e2f701..0095965109 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -1,10 +1,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper;
@@ -18,7 +15,7 @@ import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
*/
-public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO {
+public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO {
private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper();
private final int queryMaxSize;
@@ -30,14 +27,12 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageC
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
- final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.setDataProjections(MAPPER.dataProjection());
- query.setLimit(this.queryMaxSize);
-
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream()
- .map(MAPPER::map)
- .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+ return query(ProfileTaskLog.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize);
+ }
+ }).stream().sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
.collect(Collectors.toList());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 67f28ce62f..23fb8c7d1b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
@@ -16,12 +14,11 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row
import java.io.IOException;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
*/
-public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO {
+public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO {
private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper();
public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
@@ -30,37 +27,36 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
@Override
public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
- final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.setDataProjections(MAPPER.dataProjection());
+ return query(ProfileTask.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+ ProfileTaskRecord.SERVICE_ID, serviceId));
+ }
- if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
- ProfileTaskRecord.SERVICE_ID, serviceId));
- }
-
- if (StringUtil.isNotEmpty(endpointName)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
- ProfileTaskRecord.ENDPOINT_NAME, endpointName));
- }
-
- if (Objects.nonNull(startTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
- ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
- }
+ if (StringUtil.isNotEmpty(endpointName)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+ ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+ }
- if (Objects.nonNull(endTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
- ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
- }
+ if (Objects.nonNull(startTimeBucket)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
+ ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+ }
- if (Objects.nonNull(limit)) {
- query.setLimit(limit);
- }
+ if (Objects.nonNull(endTimeBucket)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
+ ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+ }
- query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+ if (Objects.nonNull(limit)) {
+ query.setLimit(limit);
+ }
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+ query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+ }
+ });
}
@Override
@@ -69,11 +65,12 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
return null;
}
- final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
- query.setLimit(1);
-
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null);
+ return query(ProfileTask.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
+ query.setLimit(1);
+ }
+ }).stream().findAny().orElse(null);
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index cb5067e890..e92a0b24ff 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -2,11 +2,9 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
@@ -15,7 +13,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.Row
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
@@ -26,7 +23,7 @@ import java.util.stream.Collectors;
/**
* {@link ProfileThreadSnapshotRecord} is a stream
*/
-public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO {
+public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO implements IProfileThreadSnapshotQueryDAO {
private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper();
private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
@@ -37,24 +34,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS
@Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
- final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
- .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
- StreamQueryResponse resp = getClient().query(query);
-
- final List<String> segmentIDs = new ArrayList<>(resp.size());
- resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId()));
- if (segmentIDs.isEmpty()) {
+ List<ProfileThreadSnapshotRecord> resp = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+ }
+ });
+
+ if (resp.isEmpty()) {
return Collections.emptyList();
}
+ final List<String> segmentIDs = resp.stream().map(ProfileThreadSnapshotRecord::getSegmentId).collect(Collectors.toList());
+
// TODO: support `IN` or `OR` logic operation in BanyanDB
List<BasicTrace> basicTraces = new LinkedList<>();
for (String segmentID : segmentIDs) {
- final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection());
- traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
- StreamQueryResponse traceResponse = getClient().query(traceQuery);
- basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+ List<BasicTrace> subSet = query(BasicTrace.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery traceQuery) {
+ traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
+ }
+ });
+ basicTraces.addAll(subSet);
}
// TODO: Sort in DB with DESC
@@ -79,33 +82,35 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBS
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
- final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
- .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
- .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
- query.setDataProjections(MAPPER.dataProjection());
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+ return query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
+ .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
+ }
+ });
}
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
- final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
- query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
- StreamQueryResponse resp = getClient().query(query);
- return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null);
+ return query(SegmentRecord.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
+ }
+ }).stream().findFirst().orElse(null);
}
private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
- final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
- .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
- .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
- query.setDataProjections(MAPPER.dataProjection());
-
- StreamQueryResponse resp = getClient().query(query);
- List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+ List<ProfileThreadSnapshotRecord> records = query(ProfileThreadSnapshotRecord.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
+ .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
+ }
+ });
switch (aggType) {
case MIN:
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index f5ee5fb255..be81608a37 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -21,7 +21,11 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
+import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
+import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 838e8a1eb3..bbfd4e3150 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -21,12 +21,14 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
-import org.apache.skywalking.oap.server.core.query.type.*;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
+import org.apache.skywalking.oap.server.core.query.type.Span;
+import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
+import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
@@ -34,16 +36,14 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageC
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
-import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
-public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO {
private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
@@ -55,110 +55,89 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
@Override
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
- StreamQuery query;
- if (startSecondTB != 0 && endSecondTB != 0) {
- query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB),
- parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection());
- } else {
- query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection());
- }
- if (minDuration != 0) {
- // duration >= minDuration
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
- }
- if (maxDuration != 0) {
- // duration <= maxDuration
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
- }
+ final QueryBuilder builder = new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ if (minDuration != 0) {
+ // duration >= minDuration
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", "duration", minDuration));
+ }
+ if (maxDuration != 0) {
+ // duration <= maxDuration
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", "duration", maxDuration));
+ }
- if (!Strings.isNullOrEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
- }
+ if (!Strings.isNullOrEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_id", serviceId));
+ }
- if (!Strings.isNullOrEmpty(serviceInstanceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
- }
+ if (!Strings.isNullOrEmpty(serviceInstanceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "service_instance_id", serviceInstanceId));
+ }
- if (!Strings.isNullOrEmpty(endpointId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
- }
+ if (!Strings.isNullOrEmpty(endpointId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "endpoint_id", endpointId));
+ }
- switch (traceState) {
- case ERROR:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
- break;
- case SUCCESS:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
- break;
- default:
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
- break;
- }
+ switch (traceState) {
+ case ERROR:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ERROR.getState()));
+ break;
+ case SUCCESS:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.SUCCESS.getState()));
+ break;
+ default:
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state", (long) BanyanDBSchema.TraceState.ALL.getState()));
+ break;
+ }
- switch (queryOrder) {
- case BY_START_TIME:
- query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
- break;
- case BY_DURATION:
- query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
- break;
- }
+ switch (queryOrder) {
+ case BY_START_TIME:
+ query.setOrderBy(new StreamQuery.OrderBy("start_time", StreamQuery.OrderBy.Type.DESC));
+ break;
+ case BY_DURATION:
+ query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
+ break;
+ }
- if (CollectionUtils.isNotEmpty(tags)) {
- for (final Tag tag : tags) {
- if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ if (CollectionUtils.isNotEmpty(tags)) {
+ for (final Tag tag : tags) {
+ if (BanyanDBSchema.INDEX_FIELDS.contains(tag.getKey())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+ }
+ }
}
+
+ query.setLimit(limit);
+ query.setOffset(from);
}
- }
+ };
- query.setLimit(limit);
- query.setOffset(from);
+ final List<BasicTrace> basicTraces;
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ basicTraces = query(BasicTrace.class, builder, TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB));
+ } else {
+ basicTraces = query(BasicTrace.class, builder);
+ }
- // build request
- StreamQueryResponse response = this.getClient().query(query);
TraceBrief brief = new TraceBrief();
- brief.setTotal(response.size());
- brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+ brief.setTotal(basicTraces.size());
+ brief.getTraces().addAll(basicTraces);
return brief;
}
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
- query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
- StreamQueryResponse response = this.getClient().query(query);
- return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
+ return query(SegmentRecord.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
+ }
+ });
}
@Override
public List<Span> doFlexibleTraceQuery(String traceId) throws IOException {
return Collections.emptyList();
}
-
- static long parseMillisFromStartSecondTB(long startSecondTB) {
- return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC).parseMillis(String.valueOf(startSecondTB));
- }
-
- static long parseMillisFromEndSecondTB(long endSecondTB) {
- long t = endSecondTB;
- long second = t % 100;
- if (second > 59) {
- second = 0;
- }
- t = t / 100;
- long minute = t % 100;
- if (minute > 59) {
- minute = 0;
- }
- t = t / 100;
- long hour = t % 100;
- if (hour > 23) {
- hour = 0;
- }
- t = t / 100;
- return YYYYMMDDHHMMSS.withZone(DateTimeZone.UTC)
- .parseMillis(String.valueOf(((t * 100 + hour) * 100 + minute) * 100 + second));
- }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 33a69a969c..95731220fd 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -2,30 +2,23 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
-import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.banyandb.v1.client.Tag;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
-import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
*/
-public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
- private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
-
+public class BanyanDBUITemplateManagementDAO extends AbstractBanyanDBDAO implements UITemplateManagementDAO {
private static final long UI_TEMPLATE_TIMESTAMP = 1L;
public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
@@ -34,14 +27,15 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage
@Override
public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection());
- query.setLimit(10000);
- if (!includingDisabled) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
- }
- query.setDataProjections(MAPPER.dataProjection());
- StreamQueryResponse resp = this.getClient().query(query);
- return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+ return query(DashboardConfiguration.class, new QueryBuilder() {
+ @Override
+ public void apply(StreamQuery query) {
+ query.setLimit(10000);
+ if (!includingDisabled) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
+ }
+ }
+ });
}
@Override