You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/02/23 14:49:35 UTC
[skywalking] 01/01: Add `sum` function in meter system
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch feature/meter-system/func-sum
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 2496c53666d3ae912c7ea2ead2a7523ec219dff4
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Tue Feb 23 21:48:33 2021 +0800
Add `sum` function in meter system
---
CHANGES.md | 1 +
docs/en/concepts-and-designs/mal.md | 4 +-
.../oap/meter/analyzer/dsl/DownsamplingType.java | 2 +-
.../oap/meter/analyzer/dsl/SampleFamily.java | 6 +-
.../analysis/meter/function/sum/SumFunction.java | 186 +++++++++++++++++++++
.../java/org/apache/skywalking/e2e/log/LogE2E.java | 6 +-
6 files changed, 198 insertions(+), 7 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1e596f0..e97c4a5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -28,6 +28,7 @@ Release Notes.
* Fix kubernetes.client.opeanapi.ApiException.
* Remove filename suffix in the meter active file config.
* Introduce log analysis language (LAL).
+* Add `sum` function in meter system.
#### UI
* Update selector scroller to show in all pages.
diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index b1edc70..695a2c1 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -177,16 +177,16 @@ MAL should instruct meter-system how to do downsampling for metrics. It doesn't
Down sampling operations are as global function in MAL:
- avg
+ - sum
- latest (TODO)
- min (TODO)
- max (TODO)
- mean (TODO)
- - sum (TODO)
- count (TODO)
The default one is `avg` if not specific an operation.
-If user want get latest time from `last_server_state_sync_time_in_seconds`:
+If user want to get the latest time from `last_server_state_sync_time_in_seconds`:
```
latest(last_server_state_sync_time_in_seconds.tagEqual('production', 'catalog'))
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
index 4427dee..587122f 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
@@ -22,5 +22,5 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
* DownsamplingType indicates the downsampling type of meter function
*/
public enum DownsamplingType {
- AVG, LATEST
+ AVG, SUM, LATEST
}
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index bb2329c..378137a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -163,6 +163,7 @@ public class SampleFamily {
/* Aggregation operators */
public SampleFamily sum(List<String> by) {
+ ExpressionParsingContext.get().ifPresent(ctx -> ctx.downsampling = DownsamplingType.SUM);
return aggregate(by, Double::sum);
}
@@ -175,7 +176,10 @@ public class SampleFamily {
}
public SampleFamily avg(List<String> by) {
- ExpressionParsingContext.get().ifPresent(ctx -> ctx.aggregationLabels.addAll(by));
+ ExpressionParsingContext.get().ifPresent(ctx -> {
+ ctx.aggregationLabels.addAll(by);
+ ctx.downsampling = DownsamplingType.AVG;
+ });
if (this == EMPTY) {
return EMPTY;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java
new file mode 100644
index 0000000..a0ccbab
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.sum;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@ToString
+@MeterFunction(functionName = "sum")
+public abstract class SumFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
+ protected static final String VALUE = "value";
+
+ @Setter
+ @Getter
+ @Column(columnName = ENTITY_ID, length = 512)
+ private String entityId;
+
+ @Setter
+ @Getter
+ @Column(columnName = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
+
+ @Getter
+ @Setter
+ @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Sum)
+ private long value;
+
+ @Entrance
+ public final void combine(@SourceFrom long value) {
+ setValue(this.value + value);
+ }
+
+ @Override
+ public final boolean combine(Metrics metrics) {
+ final SumFunction sumFunc = (SumFunction) metrics;
+ combine(sumFunc.getValue());
+ return true;
+ }
+
+ @Override
+ public final void calculate() {
+ }
+
+ @Override
+ public Metrics toHour() {
+ final SumFunction metrics = (SumFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInHour());
+ metrics.setServiceId(getServiceId());
+ metrics.setValue(getValue());
+ return metrics;
+ }
+
+ @Override
+ public Metrics toDay() {
+ final SumFunction metrics = (SumFunction) createNew();
+ metrics.setEntityId(getEntityId());
+ metrics.setTimeBucket(toTimeBucketInDay());
+ metrics.setServiceId(getServiceId());
+ metrics.setValue(getValue());
+ return metrics;
+ }
+
+ @Override
+ public int remoteHashCode() {
+ return getEntityId().hashCode();
+ }
+
+ @Override
+ public void deserialize(final RemoteData remoteData) {
+ setValue(remoteData.getDataLongs(0));
+ setTimeBucket(remoteData.getDataLongs(1));
+
+ setEntityId(remoteData.getDataStrings(0));
+ setServiceId(remoteData.getDataStrings(1));
+ }
+
+ @Override
+ public RemoteData.Builder serialize() {
+ final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+ remoteBuilder.addDataLongs(getValue());
+ remoteBuilder.addDataLongs(getTimeBucket());
+
+ remoteBuilder.addDataStrings(getEntityId());
+ remoteBuilder.addDataStrings(getServiceId());
+
+ return remoteBuilder;
+ }
+
+ @Override
+ public String id() {
+ return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
+ }
+
+ @Override
+ public void accept(final MeterEntity entity, final Long value) {
+ setEntityId(entity.id());
+ setServiceId(entity.serviceId());
+ setValue(getValue() + value);
+ }
+
+ @Override
+ public Class<? extends StorageHashMapBuilder<?>> builder() {
+ return SumStorageBuilder.class;
+ }
+
+ public static class SumStorageBuilder implements StorageHashMapBuilder<SumFunction> {
+ @Override
+ public SumFunction storage2Entity(final Map<String, Object> dbMap) {
+ final SumFunction metrics = new SumFunction() {
+ @Override
+ public AcceptableValue<Long> createNew() {
+ throw new UnexpectedException("createNew should not be called");
+ }
+ };
+ metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
+ metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+ metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+ return metrics;
+ }
+
+ @Override
+ public Map<String, Object> entity2Storage(final SumFunction storageData) {
+ final Map<String, Object> map = new HashMap<>();
+ map.put(VALUE, storageData.getValue());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+ map.put(ENTITY_ID, storageData.getEntityId());
+ return map;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SumFunction)) {
+ return false;
+ }
+ final SumFunction function = (SumFunction) o;
+ return Objects.equals(getEntityId(), function.getEntityId())
+ && Objects.equals(getTimeBucket(), function.getTimeBucket());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getEntityId(), getTimeBucket());
+ }
+}
diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
index 23cb4d1..b503f11 100644
--- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
+++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
@@ -153,9 +153,9 @@ public class LogE2E extends SkyWalkingTestAdapter {
LOGGER.info("{}: {}", metricsName, instanceMetrics);
final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
- final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
- greaterThanZero.setValue("gt 0");
- instanceRespTimeMatcher.setValue(greaterThanZero);
+ final MetricsValueMatcher greaterThanOne = new MetricsValueMatcher();
+ greaterThanOne.setValue("gt 1");
+ instanceRespTimeMatcher.setValue(greaterThanOne);
instanceRespTimeMatcher.verify(instanceMetrics.getValues());
}
}