You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/30 12:56:28 UTC

[skywalking] 02/25: abstract row entity mapper

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 44ad192398e1bbdefe68db4e503cf126e2d96d8b
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Nov 29 19:32:46 2021 +0800

    abstract row entity mapper
---
 .../plugin/banyandb/BanyanDBStorageClient.java     |  4 +++
 .../banyandb/converter/BasicTraceMapper.java       | 25 +++++++++++++++
 .../converter/DashboardConfigurationMapper.java    | 20 ++++++++++++
 .../plugin/banyandb/converter/RowEntityMapper.java |  7 ++++
 .../banyandb/converter/SegmentRecordMapper.java    | 27 ++++++++++++++++
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     | 37 +++++-----------------
 .../stream/BanyanDBUITemplateManagementDAO.java    | 35 +++++++++++++++++---
 7 files changed, 121 insertions(+), 34 deletions(-)

diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index eed4386e57..6ae741f971 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -44,6 +44,10 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
         }
     }
 
+    public void write() {
+        this.client
+    }
+
     public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) {
         return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
new file mode 100644
index 0000000000..bb43a8080a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java
@@ -0,0 +1,25 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
+
+import java.util.List;
+
+public class BasicTraceMapper implements RowEntityMapper<BasicTrace> {
+    @Override
+    public BasicTrace map(RowEntity row) {
+        BasicTrace trace = new BasicTrace();
+        trace.setSegmentId(row.getId());
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        trace.getTraceIds().add((String) searchable.get(0).getValue());
+        trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
+        trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
+                (String) searchable.get(2).getValue()
+        ).getEndpointName());
+        trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
+        trace.setStart(String.valueOf(searchable.get(4).getValue()));
+        return trace;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
new file mode 100644
index 0000000000..f4c4c19beb
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java
@@ -0,0 +1,20 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+
+import java.util.List;
+
+public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> {
+    @Override
+    public DashboardConfiguration map(RowEntity row) {
+        DashboardConfiguration dashboardConfiguration = new DashboardConfiguration();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        dashboardConfiguration.setName((String) searchable.get(0).getValue());
+        dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue()));
+        // TODO: convert back from data?
+        return dashboardConfiguration;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
new file mode 100644
index 0000000000..eb9b5fadba
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java
@@ -0,0 +1,7 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+
+public interface RowEntityMapper<T> {
+    T map(RowEntity row);
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
new file mode 100644
index 0000000000..2b20227931
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java
@@ -0,0 +1,27 @@
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter;
+
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+
+import java.util.List;
+
+public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> {
+    @Override
+    public SegmentRecord map(RowEntity row) {
+        SegmentRecord record = new SegmentRecord();
+        final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0);
+        record.setSegmentId(row.getId());
+        record.setTraceId((String) searchable.get(0).getValue());
+        record.setIsError(((Number) searchable.get(1).getValue()).intValue());
+        record.setServiceId((String) searchable.get(2).getValue());
+        record.setServiceInstanceId((String) searchable.get(3).getValue());
+        record.setEndpointId((String) searchable.get(4).getValue());
+        record.setLatency(((Number) searchable.get(5).getValue()).intValue());
+        record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
+        final List<TagAndValue<?>> data = row.getTagFamilies().get(1);
+        // TODO: support binary data in the client SDK
+        record.setDataBinary((byte[]) data.get(0).getValue());
+        return record;
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 9fc403afc6..0a3f0e53b9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -30,6 +30,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -40,6 +43,9 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO {
+    private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper();
+    private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper();
+
     private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss");
 
     private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time");
@@ -114,19 +120,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
         StreamQueryResponse response = this.getClient().query(query);
         TraceBrief brief = new TraceBrief();
         brief.setTotal(response.size());
-        brief.getTraces().addAll(response.getElements().stream().map(elem -> {
-            BasicTrace trace = new BasicTrace();
-            trace.setSegmentId(elem.getId());
-            final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
-            trace.getTraceIds().add((String) searchable.get(0).getValue());
-            trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1);
-            trace.getEndpointNames().add(IDManager.EndpointID.analysisId(
-                    (String) searchable.get(2).getValue()
-            ).getEndpointName());
-            trace.setDuration(((Long) searchable.get(3).getValue()).intValue());
-            trace.setStart(String.valueOf(searchable.get(4).getValue()));
-            return trace;
-        }).collect(Collectors.toList()));
+        brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList()));
         return brief;
     }
 
@@ -136,22 +130,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im
         query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId));
         query.setDataBinary(true);
         StreamQueryResponse response = this.getClient().query(query);
-        return response.getElements().stream().map(elem -> {
-            SegmentRecord record = new SegmentRecord();
-            final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0);
-            record.setSegmentId(elem.getId());
-            record.setTraceId((String) searchable.get(0).getValue());
-            record.setIsError(((Number) searchable.get(1).getValue()).intValue());
-            record.setServiceId((String) searchable.get(2).getValue());
-            record.setServiceInstanceId((String) searchable.get(3).getValue());
-            record.setEndpointId((String) searchable.get(4).getValue());
-            record.setLatency(((Number) searchable.get(5).getValue()).intValue());
-            record.setStartTime(((Number) searchable.get(6).getValue()).longValue());
-            final List<TagAndValue<?>> data = elem.getTagFamilies().get(1);
-            // TODO: support binary data in the client SDK
-            record.setDataBinary((byte[]) data.get(0).getValue());
-            return record;
-        }).collect(Collectors.toList());
+        return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
index ad8fed926e..102540c7b5 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java
@@ -1,31 +1,56 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
 import org.apache.skywalking.oap.server.core.query.input.DashboardSetting;
 import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration;
 import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper;
+import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream
  */
-public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO {
+public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO {
+    private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper();
+
+    public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
     @Override
     public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException {
-        return Collections.emptyList();
+        StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of(
+                UITemplate.NAME,
+                UITemplate.DISABLED
+        ));
+        query.setLimit(10000);
+        if (!includingDisabled) {
+            query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE));
+        }
+        StreamQueryResponse resp = this.getClient().query(query);
+        return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList());
     }
 
     @Override
     public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException {
-        return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build();
+        // TODO: support single write
     }
 
     @Override
     public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException {
-        return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build();
+        return TemplateChangeStatus.builder().status(false).message("Can't update the template").build();
     }
 
     @Override