You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/10/24 15:07:59 UTC
[skywalking] 01/01: update to banyandb client 0.2
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch update-banyandb
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 658d20e53e3c52708687e6b82e36ee56c57debd5
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 24 23:06:42 2022 +0800
update to banyandb client 0.2
Signed-off-by: Megrez Lu <lu...@gmail.com>
---
oap-server-bom/pom.xml | 2 +-
.../plugin/banyandb/BanyanDBStorageClient.java | 2 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 5 +-
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 118 ++++++++++-----------
.../banyandb/measure/BanyanDBMetricsDAO.java | 2 +-
.../banyandb/measure/BanyanDBMetricsQueryDAO.java | 24 +++--
.../banyandb/stream/AbstractBanyanDBDAO.java | 5 +
test/e2e-v2/script/env | 2 +-
8 files changed, 84 insertions(+), 76 deletions(-)
diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index bd1792d1ab..8b40a3f488 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -73,7 +73,7 @@
<awaitility.version>3.0.0</awaitility.version>
<httpcore.version>4.4.13</httpcore.version>
<commons-compress.version>1.21</commons-compress.version>
- <banyandb-java-client.version>0.1.0</banyandb-java-client.version>
+ <banyandb-java-client.version>0.2.0-SNAPSHOT</banyandb-java-client.version>
<kafka-clients.version>2.4.1</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index 9ba378125c..f2408f8695 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -118,7 +118,7 @@ public class BanyanDBStorageClient implements Client, HealthCheckable {
public void define(Property property) throws IOException {
try {
- this.client.save(property);
+ this.client.apply(property);
this.healthChecker.health();
} catch (BanyanDBException ex) {
healthChecker.unHealth(ex);
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 9287d5c710..eddc974fa9 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -33,6 +33,7 @@ import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
+import org.apache.skywalking.banyandb.v1.client.metadata.IntervalRule;
import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
@@ -322,9 +323,9 @@ public enum MetadataRegistry {
}
switch (kind) {
case STREAM:
- return client.define(Group.create(this.group, Catalog.STREAM, this.shard, 0, Duration.ofDays(7)));
+ return client.define(Group.create(this.group, Catalog.STREAM, this.shard, IntervalRule.create(IntervalRule.Unit.HOUR, 4), IntervalRule.create(IntervalRule.Unit.DAY, 1), IntervalRule.create(IntervalRule.Unit.DAY, 7)));
case MEASURE:
- return client.define(Group.create(this.group, Catalog.MEASURE, this.shard, 12, Duration.ofDays(7)));
+ return client.define(Group.create(this.group, Catalog.MEASURE, this.shard, IntervalRule.create(IntervalRule.Unit.HOUR, 4), IntervalRule.create(IntervalRule.Unit.HOUR, 24), IntervalRule.create(IntervalRule.Unit.DAY, 7)));
default:
throw new IllegalStateException("should not reach here");
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index e666929a76..1139604908 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -167,7 +167,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(instanceId)) {
- query.andWithID(instanceId);
+ query.and(id(instanceId));
}
}
});
@@ -204,17 +204,17 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
- PROCESS_TRAFFIC_TAGS,
- Collections.emptySet(),
- new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
- query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
- query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
- query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
- }
- });
+ PROCESS_TRAFFIC_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+ query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
+ query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ }
+ });
final List<Process> processes = new ArrayList<>();
@@ -230,18 +230,18 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
long lastPingStartTimeBucket = duration.getStartTimeBucket();
long lastPingEndTimeBucket = duration.getEndTimeBucket();
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
- PROCESS_TRAFFIC_TAGS,
- Collections.emptySet(),
- new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.and(eq(ProcessTraffic.INSTANCE_ID, serviceInstanceId));
- query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
- if (!includeVirtual) {
- query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ PROCESS_TRAFFIC_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(eq(ProcessTraffic.INSTANCE_ID, serviceInstanceId));
+ query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+ if (!includeVirtual) {
+ query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ }
}
- }
- });
+ });
final List<Process> processes = new ArrayList<>();
@@ -255,15 +255,15 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public List<Process> listProcesses(String agentId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
- PROCESS_TRAFFIC_TAGS,
- Collections.emptySet(),
- new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.and(eq(ProcessTraffic.AGENT_ID, agentId));
- query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
- }
- });
+ PROCESS_TRAFFIC_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(eq(ProcessTraffic.AGENT_ID, agentId));
+ query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ }
+ });
final List<Process> processes = new ArrayList<>();
@@ -277,41 +277,41 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
- PROCESS_TRAFFIC_TAGS,
- Collections.emptySet(),
- new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
- query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
- query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
- query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
- }
- });
+ PROCESS_TRAFFIC_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+ query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value()));
+ query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ }
+ });
return resp.getDataPoints()
- .stream()
- .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
- .size();
+ .stream()
+ .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+ .size();
}
@Override
public long getProcessCount(String instanceId) throws IOException {
MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
- PROCESS_TRAFFIC_TAGS,
- Collections.emptySet(),
- new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId));
- query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
- }
- });
+ PROCESS_TRAFFIC_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+ query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value()));
+ }
+ });
return resp.getDataPoints()
- .stream()
- .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
- .size();
+ .stream()
+ .collect(Collectors.groupingBy((Function<DataPoint, String>) dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+ .size();
}
@Override
@@ -323,7 +323,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(processId)) {
- query.andWithID(processId);
+ query.and(id(processId));
}
}
});
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index f0343e5579..b33cc4a3a7 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -61,7 +61,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
MeasureQueryResponse resp = query(model.getName(), schema.getTags(), schema.getFields(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
- query.andWithID(missCachedMetric.id());
+ query.and(id(missCachedMetric.id()));
}
});
if (resp.size() == 0) {
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
index 74cc787625..f169403f77 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsQueryDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import com.google.common.collect.ImmutableSet;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+
import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
@@ -168,9 +170,9 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
}
return Util.sortValues(
- Util.composeLabelValue(condition, labels, ids, dataTableMap),
- ids,
- ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())
+ Util.composeLabelValue(condition, labels, ids, dataTableMap),
+ ids,
+ ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName())
);
}
@@ -207,16 +209,16 @@ public class BanyanDBMetricsQueryDAO extends AbstractBanyanDBDAO implements IMet
private Map<String, DataPoint> queryIDs(String modelName, String valueColumnName, List<String> measureIDs) throws IOException {
Map<String, DataPoint> map = new HashMap<>(measureIDs.size());
- for (final String id : measureIDs) {
- MeasureQueryResponse resp = query(modelName, Collections.emptySet(), ImmutableSet.of(valueColumnName), new QueryBuilder<MeasureQuery>() {
- @Override
- protected void apply(MeasureQuery query) {
- query.andWithID(id);
+ MeasureQueryResponse resp = query(modelName, Collections.emptySet(), ImmutableSet.of(valueColumnName), new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ for (final String measureID : measureIDs) {
+ query.or(id(measureID));
}
- });
- if (resp.size() > 0) {
- map.putIfAbsent(resp.getDataPoints().get(0).getId(), resp.getDataPoints().get(0));
}
+ });
+ if (resp.size() > 0) {
+ map.putIfAbsent(resp.getDataPoints().get(0).getId(), resp.getDataPoints().get(0));
}
return map;
}
diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index bee2069770..bbe3e3dfad 100644
--- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
@@ -126,5 +127,9 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli
protected PairQueryCondition<Long> ne(String name, long value) {
return PairQueryCondition.LongQueryCondition.ne(name, value);
}
+
+ protected PairQueryCondition<String> id(String value) {
+ return PairQueryCondition.IDQueryCondition.eq(Metrics.ID, value);
+ }
}
}
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 7c08ebc39c..ed76eedbf8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -23,6 +23,6 @@ SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_BANYANDB_COMMIT=5a326d7e36a008c5ea10e3ae506309cb29733c53
+SW_BANYANDB_COMMIT=e0828d91a6cb56baacffbeed55c2428ab49c4b36
SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b