You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:28 UTC
[skywalking] 02/25: abstract row entity mapper
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 44ad192398e1bbdefe68db4e503cf126e2d96d8b
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Nov 29 19:32:46 2021 +0800
abstract row entity mapper
---
.../plugin/banyandb/BanyanDBStorageClient.java | 4 +++
.../banyandb/converter/BasicTraceMapper.java | 25 +++++++++++++++
.../converter/DashboardConfigurationMapper.java | 20 ++++++++++++
.../plugin/banyandb/converter/RowEntityMapper.java | 7 ++++
.../banyandb/converter/SegmentRecordMapper.java | 27 ++++++++++++++++
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 37 +++++-----------------
.../stream/BanyanDBUITemplateManagementDAO.java | 35 +++++++++++++++++---
7 files changed, 121 insertions(+), 34 deletions(-)
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index eed4386e57..6ae741f971 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -44,6 +44,10 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
}
}
+ public void write() {
+ this.client
+ }
+
public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
new file mode 100644
index 0000000000..bb43a8080a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+
+import java.util.List;
+
+public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
+ @Override
+ public BasicTrace map(RowEntity row) {
+ BasicTrace trace = new BasicTrace();
+ trace.setSegmentId(row.getId());
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ trace.getTraceIds().add((String) searchable.get(0).getValue());
+ trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+ trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+ (String) searchable.get(2).getValue()
+ ).getEndpointName());
+ trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+ trace.setStart(String.valueOf(searchable.get(4).getValue()));
+ return trace;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
new file mode 100644
index 0000000000..f4c4c19beb
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+
+import java.util.List;
+
+public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> {
+ @Override
+ public DashboardConfiguration map(RowEntity row) {
+ DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ dashboardConfiguration.setName((String) searchable.get(0).getValue());
+ dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
+ // TODO: convert back from data?
+ return dashboardConfiguration;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
new file mode 100644
index 0000000000..eb9b5fadba
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
@@ -0,0 +1,7 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+
+public interface RowEntityMapper<T> {
+ T map(RowEntity row);
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
new file mode 100644
index 0000000000..2b20227931
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
@@ -0,0 +1,27 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+
+import java.util.List;
+
+public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
+ @Override
+ public SegmentRecord map(RowEntity row) {
+ SegmentRecord record = new SegmentRecord();
+ final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+ record.setSegmentId(row.getId());
+ record.setTraceId((String) searchable.get(0).getValue());
+ record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+ record.setServiceId((String) searchable.get(2).getValue());
+ record.setServiceInstanceId((String) searchable.get(3).getValue());
+ record.setEndpointId((String) searchable.get(4).getValue());
+ record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+ record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+ final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+ // TODO: support binary data in the client SDK
+ record.setDataBinary((byte[]) data.get(0).getValue());
+ return record;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 9fc403afc6..0a3f0e53b9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -30,6 +30,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -40,6 +43,9 @@ import java.util.List;
import java.util.stream.Collectors;
public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+ private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
+ private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
+
private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
@@ -114,19 +120,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
StreamQueryResponse response = this.getClient().query(query);
TraceBrief brief = new TraceBrief();
brief.setTotal(response.size());
- brief.getTraces().addAll(response.getElements().stream().map(elem -> {
- BasicTrace trace = new BasicTrace();
- trace.setSegmentId(elem.getId());
- final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
- trace.getTraceIds().add((String) searchable.get(0).getValue());
- trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
- trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
- (String) searchable.get(2).getValue()
- ).getEndpointName());
- trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
- trace.setStart(String.valueOf(searchable.get(4).getValue()));
- return trace;
- }).collect(Collectors.toList()));
+ brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
return brief;
}
@@ -136,22 +130,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
query.setDataBinary(true);
StreamQueryResponse response = this.getClient().query(query);
- return response.getElements().stream().map(elem -> {
- SegmentRecord record = new SegmentRecord();
- final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
- record.setSegmentId(elem.getId());
- record.setTraceId((String) searchable.get(0).getValue());
- record.setIsError(((Number) searchable.get(1).getValue()).intValue());
- record.setServiceId((String) searchable.get(2).getValue());
- record.setServiceInstanceId((String) searchable.get(3).getValue());
- record.setEndpointId((String) searchable.get(4).getValue());
- record.setLatency(((Number) searchable.get(5).getValue()).intValue());
- record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
- final List<TagAndValue<?>> data = elem.getTagFamilies().get(1);
- // TODO: support binary data in the client SDK
- record.setDataBinary((byte[]) data.get(0).getValue());
- return record;
- }).collect(Collectors.toList());
+ return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index ad8fed926e..102540c7b5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -1,31 +1,56 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
*/
-public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
+ private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
+
+ public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
@Override
public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
- return Collections.emptyList();
+ StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of(
+ UITemplate.NAME,
+ UITemplate.DISABLED
+ ));
+ query.setLimit(10000);
+ if (!includingDisabled) {
+ query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
+ }
+ StreamQueryResponse resp = this.getClient().query(query);
+ return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
}
@Override
public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
- return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build();
+ // TODO: support single write
}
@Override
public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
- return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+ return TemplateChangeStatus.builder().status(false).message("Can't update the template").build();
}
@Override