You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/22 03:24:24 UTC
[skywalking] 03/22: complete UITemplate, ProfileTaskRecord and ProfileThreasSnapshot query
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit a363ba045b05072aa6272290566c177e68e00f50
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Tue Nov 30 19:47:54 2021 +0800
complete UITemplate, ProfileTaskRecord and ProfileThreasSnapshot query
---
.../plugin/banyandb/BanyanDBStorageClient.java | 5 +-
.../banyandb/converter/BasicTraceMapper.java | 12 +++
.../converter/DashboardConfigurationMapper.java | 23 ++++-
.../banyandb/converter/ProfileTaskMapper.java | 28 ++++++
.../ProfileThreadSnapshotRecordMapper.java | 27 ++++++
.../plugin/banyandb/converter/RowEntityMapper.java | 6 ++
.../banyandb/converter/SegmentRecordMapper.java | 16 +++-
.../stream/BanyanDBProfileTaskQueryDAO.java | 52 +++++++++-
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 105 +++++++++++++++++++--
.../plugin/banyandb/stream/BanyanDBRecordDAO.java | 6 +-
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 19 ++--
.../stream/BanyanDBUITemplateManagementDAO.java | 33 +++++--
12 files changed, 297 insertions(+), 35 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 6ae741f971..3ad4daf918 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -4,6 +4,7 @@ import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -44,8 +45,8 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
}
}
- public void write() {
- this.client
+ public void write(StreamWrite streamWrite) {
+ this.client.write(streamWrite);
}
public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
index bb43a8080a..b3512158f2 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
@@ -1,10 +1,12 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import java.util.Collections;
import java.util.List;
public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
@@ -22,4 +24,14 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
trace.setStart(String.valueOf(searchable.get(4).getValue()));
return trace;
}
+
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.emptyList();
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
index f4c4c19beb..3f7df9cb72 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
@@ -1,7 +1,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
+import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
@@ -12,9 +15,27 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo
public DashboardConfiguration map(RowEntity row) {
DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ // name
dashboardConfiguration.setName((String) searchable.get(0).getValue());
+ // disabled
dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
- // TODO: convert back from data?
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // activated
+ dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue()));
+ // configuration
+ dashboardConfiguration.setConfiguration((String) data.get(1).getValue());
+ // type
+ dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue()));
return dashboardConfiguration;
}
+
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE);
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
new file mode 100644
index 0000000000..ea1ca09cb0
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java
@@ -0,0 +1,28 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME,
+ ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD,
+ ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProfileTask map(RowEntity row) {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
new file mode 100644
index 0000000000..9a25688132
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java
@@ -0,0 +1,27 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> {
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE,
+ ProfileThreadSnapshotRecord.STACK_BINARY);
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProfileThreadSnapshotRecord map(RowEntity row) {
+ return null;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
index eb9b5fadba..b2facb3373 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
@@ -2,6 +2,12 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import java.util.List;
+
public interface RowEntityMapper<T> {
+ List<String> searchableProjection();
+
+ List<String> dataProjection();
+
T map(RowEntity row);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
index 2b20227931..6ea579ff55 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
@@ -1,9 +1,12 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.TagAndValue;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import java.util.Collections;
import java.util.List;
public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
@@ -20,8 +23,17 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
record.setLatency(((Number) searchable.get(5).getValue()).intValue());
record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
- // TODO: support binary data in the client SDK
- record.setDataBinary((byte[]) data.get(0).getValue());
+ record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray());
return record;
}
+
+ @Override
+ public List<String> searchableProjection() {
+ return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
+ }
+
+ @Override
+ public List<String> dataProjection() {
+ return Collections.singletonList("data_binary");
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
index c99d267cbe..34af74eb93 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -1,23 +1,67 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord;
import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream
*/
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
+public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO {
+ private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper();
+
+ public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException {
- return Collections.emptyList();
+ final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.setDataProjections(MAPPER.dataProjection());
+
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId));
+ }
+
+ if (StringUtil.isNotEmpty(endpointName)) {
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName));
+ }
+
+ if (Objects.nonNull(startTimeBucket)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket)));
+ }
+
+ if (Objects.nonNull(endTimeBucket)) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket)));
+ }
+
+ // TODO: why delete?
+
+ if (Objects.nonNull(limit)) {
+ query.setLimit(limit);
+ }
+
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
@Override
public ProfileTask getById(String id) throws IOException {
- return null;
+ // TODO: support id query
+ throw new UnsupportedOperationException("element id get is not supported");
}
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index 8a27e885fe..456636f287 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -1,40 +1,133 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* {@link ProfileThreadSnapshotRecord} is a stream
*/
-public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO {
+public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO {
+ private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper();
+ private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
+ private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
+
+ public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException {
- return Collections.emptyList();
+ final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+ StreamQueryResponse resp = getClient().query(query);
+
+ final List<String> segmentIDs = new ArrayList<>(resp.size());
+ resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId()));
+ if (segmentIDs.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ // TODO: support `IN` or `OR` logic operation in BanyanDB
+ List<BasicTrace> basicTraces = new LinkedList<>();
+ for (String segmentID : segmentIDs) {
+ final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection());
+ traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID));
+ StreamQueryResponse traceResponse = getClient().query(traceQuery);
+ basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
+ }
+
+ // TODO: Sort in DB with DESC
+ basicTraces = basicTraces.stream()
+ // comparing start_time
+ .sorted(Comparator.comparing((Function<BasicTrace, Long>) basicTrace -> Long.parseLong(basicTrace.getStart()))
+ // and sort in reverse order
+ .reversed())
+ .collect(Collectors.toList());
+ return basicTraces;
}
@Override
public int queryMinSequence(String segmentId, long start, long end) throws IOException {
- return 0;
+ return querySequenceWithAgg(AggType.MIN, segmentId, start, end);
}
@Override
public int queryMaxSequence(String segmentId, long start, long end) throws IOException {
- return 0;
+ return querySequenceWithAgg(AggType.MAX, segmentId, start, end);
}
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException {
- return Collections.emptyList();
+ final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence))
+ .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence));
+ query.setDataProjections(MAPPER.dataProjection());
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws IOException {
- return null;
+ final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId));
+ query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
+ StreamQueryResponse resp = getClient().query(query);
+ return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null);
+ }
+
+ private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) {
+ final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection());
+ query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
+ .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end))
+ .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start));
+ query.setDataProjections(MAPPER.dataProjection());
+
+ StreamQueryResponse resp = getClient().query(query);
+ List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
+
+ switch (aggType) {
+ case MIN:
+ int minValue = Integer.MAX_VALUE;
+ for (final ProfileThreadSnapshotRecord record : records) {
+ int sequence = record.getSequence();
+ minValue = Math.min(minValue, sequence);
+ }
+ return minValue;
+ case MAX:
+ int maxValue = Integer.MIN_VALUE;
+ for (ProfileThreadSnapshotRecord record : records) {
+ int sequence = record.getSequence();
+ maxValue = Math.max(maxValue, sequence);
+ }
+ return maxValue;
+ default:
+ throw new IllegalArgumentException("should not reach this line");
+ }
+ }
+
+ enum AggType {
+ MIN, MAX
}
}
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index f59bc25f9e..54870d98c6 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -45,11 +45,11 @@ public class BanyanDBRecordDAO implements IRecordDAO {
if (SegmentRecord.INDEX_NAME.equals(model.getName())) {
SegmentRecord segmentRecord = (SegmentRecord) record;
StreamWrite streamWrite = StreamWrite.builder()
- .name(BanyanDBSchema.NAME)
- .binary(segmentRecord.getDataBinary())
+ .name(SegmentRecord.INDEX_NAME)
+ .dataTag(Tag.binaryField(segmentRecord.getDataBinary()))
.timestamp(segmentRecord.getStartTime())
.elementId(segmentRecord.getSegmentId())
- .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
+ .searchableTags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord)))
.build();
return new BanyanDBStreamInsertRequest(streamWrite);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 0a3f0e53b9..a37cfbb54c 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -19,9 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import org.apache.skywalking.banyandb.v1.client.*;
-import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.*;
@@ -48,9 +49,6 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
- private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
- private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time");
-
public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@@ -59,9 +57,10 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException {
StreamQuery query;
if (startSecondTB != 0 && endSecondTB != 0) {
- query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ);
+ query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB),
+ parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection());
} else {
- query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ);
+ query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection());
}
if (minDuration != 0) {
// duration >= minDuration
@@ -126,9 +125,9 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
- StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ);
+ StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection());
query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
- query.setDataBinary(true);
+ query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection());
StreamQueryResponse response = this.getClient().query(query);
return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index 102540c7b5..a4875e65f1 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -1,9 +1,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
-import com.google.common.collect.ImmutableList;
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.banyandb.v1.client.Tag;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
@@ -25,27 +26,45 @@ import java.util.stream.Collectors;
public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
+ private static final long UI_TEMPLATE_TIMESTAMP = 1L;
+
public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of(
- UITemplate.NAME,
- UITemplate.DISABLED
- ));
+ StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection());
query.setLimit(10000);
if (!includingDisabled) {
query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
}
+ query.setDataProjections(MAPPER.dataProjection());
StreamQueryResponse resp = this.getClient().query(query);
return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
@Override
public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
- // TODO: support single write
+ final UITemplate uiTemplate = setting.toEntity();
+
+ StreamWrite request = StreamWrite.builder()
+ .name(UITemplate.INDEX_NAME)
+ // searchable - name
+ .searchableTag(Tag.stringField(uiTemplate.getName()))
+ // searchable - disabled
+ .searchableTag(Tag.longField(uiTemplate.getDisabled()))
+ // data - type
+ .dataTag(Tag.stringField(uiTemplate.getType()))
+ // data - configuration
+ .dataTag(Tag.stringField(uiTemplate.getConfiguration()))
+ // data - activated
+ .dataTag(Tag.longField(uiTemplate.getActivated()))
+ .timestamp(UI_TEMPLATE_TIMESTAMP)
+ .elementId(uiTemplate.id())
+ .build();
+ getClient().write(request);
+ return TemplateChangeStatus.builder().status(true).build();
}
@Override
@@ -55,6 +74,6 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage
@Override
public TemplateChangeStatus disableTemplate(String name) throws IOException {
- return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+ return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build();
}
}