You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/10/24 15:07:59 UTC

[skywalking] 01/01: update to banyandb client 0.2

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch update-banyandb
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 658d20e53e3c52708687e6b82e36ee56c57debd5
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 24 23:06:42 2022 +0800

    update to banyandb client 0.2
    
    Signed-off-by: Megrez Lu <lu...@gmail.com>
---
 oap-server-bom/pom.xml                             |   2 +-
 .../plugin/banyandb/BanyanDBStorageClient.java     |   2 +-
 .../storage/plugin/banyandb/MetadataRegistry.java  |   5 +-
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 118 ++++++++++-----------
 .../banyandb/measure/BanyanDBMetricsDAO.java       |   2 +-
 .../banyandb/measure/BanyanDBMetricsQueryDAO.java  |  24 +++--
 .../banyandb/stream/AbstractBanyanDBDAO.java       |   5 +
 test/e2e-v2/script/env                             |   2 +-
 8 files changed, 84 insertions(+), 76 deletions(-)

diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index bd1792d1ab..8b40a3f488 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,7 +73,7 @@
         <awaitility.version>3.0.0</awaitility.version>
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.1.0</banyandb-java-client.version>
+        <banyandb-java-client.version>0.2.0-SNAPSHOT</banyandb-java-client.version>
         <kafka-clients.version>2.4.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
     </properties>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 9ba378125c..f2408f8695 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -118,7 +118,7 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
 
     public void define(Property property) throws IOException {
         try {
-            this.client.save(property);
+            this.client.apply(property);
             this.healthChecker.health();
         } catch (BanyanDBException ex) {
             healthChecker.unHealth(ex);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 9287d5c710..eddc974fa9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -33,6 +33,7 @@ import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
 import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.apache.skywalking.banyandb.v1.client.metadata.Group;
 import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
@@ -322,9 +323,9 @@ public enum MetadataRegistry {
             }
             switch (kind) {
                 case STREAM:
-                    return client.define(Group.create(this.group, Catalog.STREAM, this.shard, 0, Duration.ofDays(7)));
+                    return client.define(Group.create(this.group, Catalog.STREAM, this.shard, IntervalRule.create(IntervalRule.Unit.HOUR, 4), IntervalRule.create(IntervalRule.Unit.DAY, 1), IntervalRule.create(IntervalRule.Unit.DAY, 7)));
                 case MEASURE:
-                    return client.define(Group.create(this.group, Catalog.MEASURE, this.shard, 12, Duration.ofDays(7)));
+                    return client.define(Group.create(this.group, Catalog.MEASURE, this.shard, IntervalRule.create(IntervalRule.Unit.HOUR, 4), IntervalRule.create(IntervalRule.Unit.HOUR, 24), IntervalRule.create(IntervalRule.Unit.DAY, 7)));
                 default:
                     throw new IllegalStateException("should not reach here");
             }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index e666929a76..1139604908 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -167,7 +167,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
                     @Override
                     protected void apply(MeasureQuery query) {
                         if (StringUtil.isNotEmpty(instanceId)) {
-                            query.andWithID(instanceId);
+                            query.and(id(instanceId));
                         }
                     }
                 });
@@ -204,17 +204,17 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
     @Override
     public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
         MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
-            PROCESS_TRAFFIC_TAGS,
-            Collections.emptySet(),
-            new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
-                    query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
-                    query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
-                    query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
-                }
-            });
+                PROCESS_TRAFFIC_TAGS,
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
+                        query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+                        query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
+                        query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                    }
+                });
 
         final List<Process> processes = new ArrayList<>();
 
@@ -230,18 +230,18 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
         long lastPingStartTimeBucket = duration.getStartTimeBucket();
         long lastPingEndTimeBucket = duration.getEndTimeBucket();
         MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
-            PROCESS_TRAFFIC_TAGS,
-            Collections.emptySet(),
-            new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.and(eq(ProcessTraffic.INSTANCE_ID, serviceInstanceId));
-                    query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
-                    if (!includeVirtual) {
-                        query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                PROCESS_TRAFFIC_TAGS,
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        query.and(eq(ProcessTraffic.INSTANCE_ID, serviceInstanceId));
+                        query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+                        if (!includeVirtual) {
+                            query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                        }
                     }
-                }
-            });
+                });
 
         final List<Process> processes = new ArrayList<>();
 
@@ -255,15 +255,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
     @Override
     public List<Process> listProcesses(String agentId) throws IOException {
         MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
-            PROCESS_TRAFFIC_TAGS,
-            Collections.emptySet(),
-            new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.and(eq(ProcessTraffic.AGENT_ID, agentId));
-                    query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
-                }
-            });
+                PROCESS_TRAFFIC_TAGS,
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        query.and(eq(ProcessTraffic.AGENT_ID, agentId));
+                        query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                    }
+                });
 
         final List<Process> processes = new ArrayList<>();
 
@@ -277,41 +277,41 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
     @Override
     public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
         MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
-            PROCESS_TRAFFIC_TAGS,
-            Collections.emptySet(),
-            new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
-                    query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
-                    query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
-                    query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
-                }
-            });
+                PROCESS_TRAFFIC_TAGS,
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
+                        query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+                        query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
+                        query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                    }
+                });
 
         return resp.getDataPoints()
-            .stream()
-            .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
-            .size();
+                .stream()
+                .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+                .size();
     }
 
     @Override
     public long getProcessCount(String instanceId) throws IOException {
         MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
-            PROCESS_TRAFFIC_TAGS,
-            Collections.emptySet(),
-            new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId));
-                    query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
-                }
-            });
+                PROCESS_TRAFFIC_TAGS,
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+                        query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+                    }
+                });
 
         return resp.getDataPoints()
-            .stream()
-            .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
-            .size();
+                .stream()
+                .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+                .size();
     }
 
     @Override
@@ -323,7 +323,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
                     @Override
                     protected void apply(MeasureQuery query) {
                         if (StringUtil.isNotEmpty(processId)) {
-                            query.andWithID(processId);
+                            query.and(id(processId));
                         }
                     }
                 });
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index f0343e5579..b33cc4a3a7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -61,7 +61,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
             MeasureQueryResponse resp = query(model.getName(), schema.getTags(), schema.getFields(), new QueryBuilder<MeasureQuery>() {
                 @Override
                 protected void apply(MeasureQuery query) {
-                    query.andWithID(missCachedMetric.id());
+                    query.and(id(missCachedMetric.id()));
                 }
             });
             if (resp.size() == 0) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
index 74cc787625..f169403f77 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
 import com.google.common.collect.ImmutableSet;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -26,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+
 import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
@@ -168,9 +170,9 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
         }
 
         return Util.sortValues(
-            Util.composeLabelValue(condition, labels, ids, dataTableMap),
-            ids,
-            ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())
+                Util.composeLabelValue(condition, labels, ids, dataTableMap),
+                ids,
+                ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())
         );
     }
 
@@ -207,16 +209,16 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
 
     private Map<String, DataPoint> queryIDs(String modelName, String valueColumnName, List<String> measureIDs) throws IOException {
         Map<String, DataPoint> map = new HashMap<>(measureIDs.size());
-        for (final String id : measureIDs) {
-            MeasureQueryResponse resp = query(modelName, Collections.emptySet(), ImmutableSet.of(valueColumnName), new QueryBuilder<MeasureQuery>() {
-                @Override
-                protected void apply(MeasureQuery query) {
-                    query.andWithID(id);
+        MeasureQueryResponse resp = query(modelName, Collections.emptySet(), ImmutableSet.of(valueColumnName), new QueryBuilder<MeasureQuery>() {
+            @Override
+            protected void apply(MeasureQuery query) {
+                for (final String measureID : measureIDs) {
+                    query.or(id(measureID));
                 }
-            });
-            if (resp.size() > 0) {
-                map.putIfAbsent(resp.getDataPoints().get(0).getId(), resp.getDataPoints().get(0));
             }
+        });
+        if (resp.size() > 0) {
+            map.putIfAbsent(resp.getDataPoints().get(0).getId(), resp.getDataPoints().get(0));
         }
         return map;
     }
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index bee2069770..bbe3e3dfad 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
 import org.apache.skywalking.banyandb.v1.client.StreamQuery;
 import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
@@ -126,5 +127,9 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
         protected PairQueryCondition<Long> ne(String name, long value) {
             return PairQueryCondition.LongQueryCondition.ne(name, value);
         }
+
+        protected PairQueryCondition<String> id(String value) {
+            return PairQueryCondition.IDQueryCondition.eq(Metrics.ID, value);
+        }
     }
 }
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 7c08ebc39c..ed76eedbf8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,6 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
 SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_BANYANDB_COMMIT=5a326d7e36a008c5ea10e3ae506309cb29733c53
+SW_BANYANDB_COMMIT=e0828d91a6cb56baacffbeed55c2428ab49c4b36
 
 SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b