You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 09:03:23 UTC
[skywalking] 04/24: add alarm, networkAddressAlias and logs
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit cb705ed0cc8d014839f2f13078442307b6e2b9df
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Wed Dec 1 23:29:21 2021 +0800
add alarm, networkAddressAlias and logs
---
.../banyandb/converter/ProfileTaskMapper.java | 28 ---------
.../ProfileThreadSnapshotRecordMapper.java | 27 --------
.../banyandb/deserializer/AlarmMessageMapper.java | 51 +++++++++++++++
.../BasicTraceMapper.java | 2 +-
.../deserializer/BrowserErrorLogMapper.java | 59 ++++++++++++++++++
.../DashboardConfigurationMapper.java | 2 +-
.../plugin/banyandb/deserializer/EventMapper.java | 24 ++++++++
.../plugin/banyandb/deserializer/LogMapper.java | 61 ++++++++++++++++++
.../deserializer/NetworkAddressAliasMapper.java | 40 ++++++++++++
.../deserializer/ProfileTaskLogMapper.java | 39 ++++++++++++
.../banyandb/deserializer/ProfileTaskMapper.java | 42 +++++++++++++
.../ProfileThreadSnapshotRecordMapper.java | 36 +++++++++++
.../RowEntityMapper.java | 2 +-
.../SegmentRecordMapper.java | 2 +-
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 47 +++++++++++++-
.../stream/BanyanDBBrowserLogQueryDAO.java | 50 ++++++++++++++-
.../banyandb/stream/BanyanDBLogQueryDAO.java | 72 +++++++++++++++++++++-
.../stream/BanyanDBNetworkAddressAliasDAO.java | 23 ++++++-
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 31 +++++++++-
.../stream/BanyanDBProfileTaskQueryDAO.java | 32 +++++++---
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 8 +--
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 6 +-
.../stream/BanyanDBUITemplateManagementDAO.java | 4 +-
23 files changed, 597 insertions(+), 91 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
deleted file mode 100644
index ea1ca09cb0..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
- ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD,
- ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.emptyList();
- }
-
- @Override
- public ProfileTask map(RowEntity row) {
- return null;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
deleted file mode 100644
index 9a25688132..0000000000
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.RowEntity;
-import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
-
-import java.util.Collections;
-import java.util.List;
-
-public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
- @Override
- public List<String> searchableProjection() {
- return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
- ProfileThreadSnapshotRecord.STACK_BINARY);
- }
-
- @Override
- public List<String> dataProjection() {
- return Collections.emptyList();
- }
-
- @Override
- public ProfileThreadSnapshotRecord map(RowEntity row) {
- return null;
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
new file mode 100644
index 0000000000..d906dbb78f
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/AlarmMessageMapper.java
@@ -0,0 +1,51 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.query.enumeration.Scope;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
+import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+
+import java.util.List;
+
+@RequiredArgsConstructor
+public class AlarmMessageMapper implements RowEntityMapper<AlarmMessage> {
+ private final IAlarmQueryDAO alarmQueryDAO;
+
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(AlarmRecord.SCOPE, // 0
+ AlarmRecord.START_TIME); // 1
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return ImmutableList.of(AlarmRecord.ID0, // 0
+ AlarmRecord.ID1, // 1
+ AlarmRecord.ALARM_MESSAGE, // 2
+ AlarmRecord.TAGS_RAW_DATA); // 3
+ }
+
+ @Override
+ public AlarmMessage map(RowEntity row) {
+ AlarmMessage alarmMessage = new AlarmMessage();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ int scopeID = ((Number) searchable.get(0).getValue()).intValue();
+ alarmMessage.setScopeId(scopeID);
+ alarmMessage.setScope(Scope.Finder.valueOf(scopeID));
+ alarmMessage.setStartTime(((Number) searchable.get(1).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ alarmMessage.setId((String) data.get(0).getValue());
+ alarmMessage.setId1((String) data.get(1).getValue());
+ alarmMessage.setMessage((String) data.get(2).getValue());
+ Object o = data.get(3).getValue();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ this.alarmQueryDAO.parserDataBinary(((ByteString) o).toByteArray(), alarmMessage.getTags());
+ }
+ return alarmMessage;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
similarity index 98%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
index b3512158f2..bbcf7091d0 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BasicTraceMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
new file mode 100644
index 0000000000..0807955376
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/BrowserErrorLogMapper.java
@@ -0,0 +1,59 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
+import org.apache.skywalking.oap.server.core.query.type.ErrorCategory;
+
+import java.util.List;
+
+public class BrowserErrorLogMapper implements RowEntityMapper<BrowserErrorLog> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID,
+ BrowserErrorLogRecord.SERVICE_VERSION_ID,
+ BrowserErrorLogRecord.PAGE_PATH_ID,
+ BrowserErrorLogRecord.ERROR_CATEGORY,
+ BrowserErrorLogRecord.TIMESTAMP
+ );
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return ImmutableList.of(BrowserErrorLogRecord.DATA_BINARY);
+ }
+
+ @Override
+ public BrowserErrorLog map(RowEntity row) {
+ // FIXME: use protobuf directly
+ BrowserErrorLog log = new BrowserErrorLog();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ log.setService((String) searchable.get(0).getValue());
+ log.setServiceVersion((String) searchable.get(1).getValue());
+ log.setPagePath((String) searchable.get(2).getValue());
+ log.setCategory(ErrorCategory.valueOf((String) searchable.get(3).getValue()));
+ log.setTime(((Number) searchable.get(4).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ Object o = data.get(0).getValue();
+ if (o instanceof ByteString && !((ByteString) o).isEmpty()) {
+ try {
+ org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
+ .parseFrom((ByteString) o);
+ log.setGrade(browserErrorLog.getGrade());
+ log.setCol(browserErrorLog.getCol());
+ log.setLine(browserErrorLog.getLine());
+ log.setMessage(browserErrorLog.getMessage());
+ log.setErrorUrl(browserErrorLog.getErrorUrl());
+ log.setStack(browserErrorLog.getStack());
+ log.setFirstReportedError(browserErrorLog.getFirstReportedError());
+ } catch (InvalidProtocolBufferException ex) {
+ throw new RuntimeException("fail to parse proto buffer", ex);
+ }
+ }
+ return log;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
similarity index 99%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
index 3f7df9cb72..6b9e0962e6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/DashboardConfigurationMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
new file mode 100644
index 0000000000..1c94903cbd
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/EventMapper.java
@@ -0,0 +1,24 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.query.type.event.Event;
+
+import java.util.List;
+
+public class EventMapper implements RowEntityMapper<Event> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return null;
+ }
+
+ @Override
+ public Event map(RowEntity row) {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
new file mode 100644
index 0000000000..f1c5aaf442
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/LogMapper.java
@@ -0,0 +1,61 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.logging.v3.LogTags;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.KeyValue;
+import org.apache.skywalking.oap.server.core.query.type.Log;
+
+import java.util.List;
+
+public class LogMapper implements RowEntityMapper<Log> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(
+ AbstractLogRecord.SERVICE_ID, // 0
+ AbstractLogRecord.SERVICE_INSTANCE_ID, // 1
+ AbstractLogRecord.ENDPOINT_ID, // 2
+ AbstractLogRecord.TRACE_ID, // 3
+ AbstractLogRecord.TRACE_SEGMENT_ID,
+ AbstractLogRecord.SPAN_ID,
+ AbstractLogRecord.TIMESTAMP); // 6
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return ImmutableList.of(AbstractLogRecord.CONTENT_TYPE,
+ AbstractLogRecord.CONTENT,
+ AbstractLogRecord.TAGS_RAW_DATA); // 2
+ }
+
+ @Override
+ public Log map(RowEntity row) {
+ Log log = new Log();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ log.setServiceId((String) searchable.get(0).getValue());
+ log.setServiceInstanceId((String) searchable.get(1).getValue());
+ log.setEndpointId((String) searchable.get(2).getValue());
+ log.setTraceId((String) searchable.get(3).getValue());
+ log.setTimestamp(((Number) searchable.get(6).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ if (data.get(2).getValue() == null || ((ByteString) data.get(2).getValue()).isEmpty()) {
+ log.setContent("");
+ } else {
+ try {
+ // Don't read the tags as they have been in the data binary already.
+ LogTags logTags = LogTags.parseFrom((ByteString) data.get(2).getValue());
+ for (final KeyStringValuePair pair : logTags.getDataList()) {
+ log.getTags().add(new KeyValue(pair.getKey(), pair.getValue()));
+ }
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return log;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
new file mode 100644
index 0000000000..aeb823c752
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/NetworkAddressAliasMapper.java
@@ -0,0 +1,40 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+
+import java.util.List;
+
+public class NetworkAddressAliasMapper implements RowEntityMapper<NetworkAddressAlias> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ // TODO: make these static fields public
+ return ImmutableList.of(Metrics.TIME_BUCKET, "address", "represent_service_id", "represent_service_instance_id");
+ }
+
+ @Override
+ public NetworkAddressAlias map(RowEntity row) {
+ NetworkAddressAlias model = new NetworkAddressAlias();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // searchable - last_update_time_bucket
+ model.setLastUpdateTimeBucket(((Number) searchable.get(0).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // data - time_bucket
+ model.setTimeBucket(((Number) data.get(0).getValue()).longValue());
+ // data - address
+ model.setAddress((String) data.get(1).getValue());
+ // data - represent_service_id
+ model.setRepresentServiceId((String) data.get(2).getValue());
+ // data - represent_service_instance_id
+ model.setRepresentServiceInstanceId((String) data.get(3).getValue());
+ return model;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
new file mode 100644
index 0000000000..bc157253f0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskLogMapper.java
@@ -0,0 +1,39 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType;
+
+import java.util.List;
+
+public class ProfileTaskLogMapper implements RowEntityMapper<ProfileTaskLog> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return ImmutableList.of(ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.INSTANCE_ID,
+ ProfileTaskLogRecord.OPERATION_TYPE);
+ }
+
+ @Override
+ public ProfileTaskLog map(RowEntity row) {
+ ProfileTaskLog profileTaskLog = new ProfileTaskLog();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // searchable - operation_time
+ profileTaskLog.setOperationTime(((Number) searchable.get(0).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // searchable - task_id
+ profileTaskLog.setTaskId((String) data.get(0).getValue());
+ // searchable - instance_id
+ profileTaskLog.setInstanceId((String) data.get(1).getValue());
+ // searchable - operation_type
+ profileTaskLog.setOperationType(ProfileTaskLogOperationType.parse(((Number) data.get(2).getValue()).intValue()));
+ return profileTaskLog;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
new file mode 100644
index 0000000000..8a49404d23
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileTaskMapper.java
@@ -0,0 +1,42 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+ public static final String ID = "profile_task_query_id";
+
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(ID, ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME, ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD, ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProfileTask map(RowEntity row) {
+ ProfileTask profileTask = new ProfileTask();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ profileTask.setId((String) searchable.get(0).getValue());
+ profileTask.setServiceId((String) searchable.get(1).getValue());
+ profileTask.setEndpointName((String) searchable.get(2).getValue());
+ profileTask.setStartTime(((Number) searchable.get(3).getValue()).longValue());
+ profileTask.setDuration(((Number) searchable.get(4).getValue()).intValue());
+ profileTask.setMinDurationThreshold(((Number) searchable.get(5).getValue()).intValue());
+ profileTask.setDumpPeriod(((Number) searchable.get(6).getValue()).intValue());
+ profileTask.setCreateTime(((Number) searchable.get(7).getValue()).intValue());
+ profileTask.setMaxSamplingCount(((Number) searchable.get(8).getValue()).intValue());
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
new file mode 100644
index 0000000000..99f23117f2
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/ProfileThreadSnapshotRecordMapper.java
@@ -0,0 +1,36 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY);
+ }
+
+ @Override
+ public ProfileThreadSnapshotRecord map(RowEntity row) {
+ ProfileThreadSnapshotRecord record = new ProfileThreadSnapshotRecord();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ record.setTaskId((String) searchable.get(0).getValue());
+ record.setSegmentId((String) searchable.get(1).getValue());
+ record.setDumpTime(((Number) searchable.get(2).getValue()).longValue());
+ record.setSequence(((Number) searchable.get(3).getValue()).intValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ record.setStackBinary(((ByteString) data.get(0).getValue()).toByteArray());
+ return record;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
similarity index 95%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
index b2facb3373..cc1d48d94f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/RowEntityMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
similarity index 99%
rename from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
rename to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
index 6ea579ff55..70318136f6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/deserializer/SegmentRecordMapper.java
@@ -1,4 +1,4 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index c7bc9cc13d..7e122611bf 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -1,19 +1,62 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
+import org.apache.skywalking.oap.server.core.query.type.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.type.Alarms;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.AlarmMessageMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.alarm.AlarmRecord} is a stream,
* which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage}
*/
-public class BanyanDBAlarmQueryDAO implements IAlarmQueryDAO {
+public class BanyanDBAlarmQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IAlarmQueryDAO {
+ private final RowEntityMapper<AlarmMessage> mapper;
+
+ public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ mapper = new AlarmMessageMapper(this);
+ }
+
@Override
public Alarms getAlarm(Integer scopeId, String keyword, int limit, int from, long startTB, long endTB, List<Tag> tags) throws IOException {
- return new Alarms();
+ final StreamQuery query = new StreamQuery(AlarmRecord.INDEX_NAME, mapper.searchableProjection());
+
+ if (Objects.nonNull(scopeId)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AlarmRecord.SCOPE, (long) scopeId));
+ }
+ if (startTB != 0 && endTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(startTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AlarmRecord.START_TIME, TimeBucket.getTimestamp(endTB)));
+ }
+
+ // TODO: support keyword search
+
+ // TODO: support tag search
+
+ query.setLimit(limit);
+ query.setOffset(from);
+
+ StreamQueryResponse resp = getClient().query(query);
+
+ List<AlarmMessage> messages = resp.getElements().stream().map(mapper::map).collect(Collectors.toList());
+
+ Alarms alarms = new Alarms();
+ alarms.setTotal(messages.size());
+ alarms.getMsgs().addAll(messages);
+ return alarms;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index 7607668eba..25f2c27f4b 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -1,17 +1,63 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord;
import org.apache.skywalking.oap.server.core.browser.source.BrowserErrorCategory;
+import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLog;
import org.apache.skywalking.oap.server.core.query.type.BrowserErrorLogs;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BrowserErrorLogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream
*/
-public class BanyanDBBrowserLogQueryDAO implements IBrowserLogQueryDAO {
+public class BanyanDBBrowserLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IBrowserLogQueryDAO {
+ private static final RowEntityMapper<BrowserErrorLog> MAPPER = new BrowserErrorLogMapper();
+
+ public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public BrowserErrorLogs queryBrowserErrorLogs(String serviceId, String serviceVersionId, String pagePathId, BrowserErrorCategory category, long startSecondTB, long endSecondTB, int limit, int from) throws IOException {
- return new BrowserErrorLogs();
+ final StreamQuery query = new StreamQuery(BrowserErrorLogRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.setDataProjections(MAPPER.dataProjection());
+
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_ID, serviceId));
+
+ if (startSecondTB != 0 && endSecondTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startSecondTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", BrowserErrorLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endSecondTB)));
+ }
+ if (StringUtil.isNotEmpty(serviceVersionId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.SERVICE_VERSION_ID, serviceVersionId));
+ }
+ if (StringUtil.isNotEmpty(pagePathId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+ }
+ if (Objects.nonNull(category)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", BrowserErrorLogRecord.ERROR_CATEGORY, (long) category.getValue()));
+ }
+
+ query.setOffset(from);
+ query.setLimit(limit);
+
+ final StreamQueryResponse resp = getClient().query(query);
+
+ final BrowserErrorLogs logs = new BrowserErrorLogs();
+ logs.getLogs().addAll(resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()));
+ logs.setTotal(logs.getLogs().size());
+ return logs;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index baba93b886..661f65b930 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -1,20 +1,86 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
+import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.LogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
*/
-public class BanyanDBLogQueryDAO implements ILogQueryDAO {
+public class BanyanDBLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ILogQueryDAO {
+ private static final RowEntityMapper<Log> MAPPER = new LogMapper();
+
+ public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
- public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId, TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit, long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent, List<String> excludingKeywordsOfContent) throws IOException {
- return new Logs();
+ public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
+ TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
+ long startTB, long endTB, List<Tag> tags, List<String> keywordsOfContent,
+ List<String> excludingKeywordsOfContent) throws IOException {
+ final StreamQuery query = new StreamQuery(LogRecord.INDEX_NAME, MAPPER.searchableProjection());
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_ID, serviceId));
+ }
+
+ if (startTB != 0 && endTB != 0) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(startTB)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", AbstractLogRecord.TIMESTAMP, TimeBucket.getTimestamp(endTB)));
+ }
+
+ if (StringUtil.isNotEmpty(serviceInstanceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+ }
+ if (StringUtil.isNotEmpty(endpointId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.ENDPOINT_ID, endpointId));
+ }
+ if (Objects.nonNull(relatedTrace)) {
+ if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
+ }
+ if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
+ }
+ if (Objects.nonNull(relatedTrace.getSpanId())) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
+ }
+ }
+
+ // TODO: if we allow to index tags?
+// if (CollectionUtils.isNotEmpty(tags)) {
+// for (final Tag tag : tags) {
+// query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", tag.getKey(), tag.getValue()));
+// }
+// }
+
+ StreamQueryResponse resp = getClient().query(query);
+
+ List<Log> entities = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+
+ Logs logs = new Logs();
+ logs.getLogs().addAll(entities);
+ logs.setTotal(entities.size());
+
+ return logs;
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
index 10675a49a1..337c288e96 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBNetworkAddressAliasDAO.java
@@ -1,17 +1,34 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.networkalias.NetworkAddressAlias;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.NetworkAddressAliasMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link NetworkAddressAlias} is a stream
*/
-public class BanyanDBNetworkAddressAliasDAO implements INetworkAddressAliasDAO {
+public class BanyanDBNetworkAddressAliasDAO extends AbstractDAO<BanyanDBStorageClient> implements INetworkAddressAliasDAO {
+ private static final RowEntityMapper<NetworkAddressAlias> MAPPER = new NetworkAddressAliasMapper();
+
+ public BanyanDBNetworkAddressAliasDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public List<NetworkAddressAlias> loadLastUpdate(long timeBucket) {
- return Collections.emptyList();
+ StreamQuery query = new StreamQuery(NetworkAddressAlias.INDEX_NAME, MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", NetworkAddressAlias.LAST_UPDATE_TIME_BUCKET, timeBucket));
+
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index 2b6290e2d2..6c19e2f701 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -1,18 +1,43 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskLogMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream
*/
-public class BanyanDBProfileTaskLogQueryDAO implements IProfileTaskLogQueryDAO {
+public class BanyanDBProfileTaskLogQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskLogQueryDAO {
+ private static final RowEntityMapper<ProfileTaskLog> MAPPER = new ProfileTaskLogMapper();
+
+ private final int queryMaxSize;
+
+ public BanyanDBProfileTaskLogQueryDAO(BanyanDBStorageClient client, int queryMaxSize) {
+ super(client);
+ this.queryMaxSize = queryMaxSize;
+ }
+
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
- return Collections.emptyList();
+ final StreamQuery query = new StreamQuery(ProfileTaskLogRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.setDataProjections(MAPPER.dataProjection());
+ query.setLimit(this.queryMaxSize);
+
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream()
+ .map(MAPPER::map)
+ .sorted(Comparator.comparingLong(ProfileTaskLog::getOperationTime))
+ .collect(Collectors.toList());
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index 34af74eb93..67f28ce62f 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -10,8 +10,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileTaskMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;
@@ -34,34 +34,46 @@ public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClie
query.setDataProjections(MAPPER.dataProjection());
if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId));
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+ ProfileTaskRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(endpointName)) {
- query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable",
+ ProfileTaskRecord.ENDPOINT_NAME, endpointName));
}
if (Objects.nonNull(startTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable",
+ ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
}
if (Objects.nonNull(endTimeBucket)) {
- query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable",
+ ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
}
- // TODO: why delete?
-
if (Objects.nonNull(limit)) {
query.setLimit(limit);
}
+ query.setOrderBy(new StreamQuery.OrderBy(ProfileTaskRecord.START_TIME, StreamQuery.OrderBy.Type.DESC));
+
StreamQueryResponse resp = getClient().query(query);
return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
@Override
public ProfileTask getById(String id) throws IOException {
- // TODO: support id query
- throw new UnsupportedOperationException("element id get is not supported");
+ if (StringUtil.isEmpty(id)) {
+ return null;
+ }
+
+ final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskMapper.ID, id));
+ query.setLimit(1);
+
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(MAPPER::map).findAny().orElse(null);
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 456636f287..cb5067e890 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -9,10 +9,10 @@ import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.ProfileThreadSnapshotRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
import java.io.IOException;
import java.util.ArrayList;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index a37cfbb54c..838e8a1eb3 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -31,9 +31,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.SegmentRecordMapper;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index a4875e65f1..33a69a969c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -13,8 +13,8 @@ import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper;
-import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.DashboardConfigurationMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.deserializer.RowEntityMapper;
import java.io.IOException;
import java.util.List;