You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/02/23 14:49:35 UTC

[skywalking] 01/01: Add `sum` function in meter system

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch feature/meter-system/func-sum
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 2496c53666d3ae912c7ea2ead2a7523ec219dff4
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Tue Feb 23 21:48:33 2021 +0800

    Add `sum` function in meter system
---
 CHANGES.md                                         |   1 +
 docs/en/concepts-and-designs/mal.md                |   4 +-
 .../oap/meter/analyzer/dsl/DownsamplingType.java   |   2 +-
 .../oap/meter/analyzer/dsl/SampleFamily.java       |   6 +-
 .../analysis/meter/function/sum/SumFunction.java   | 186 +++++++++++++++++++++
 .../java/org/apache/skywalking/e2e/log/LogE2E.java |   6 +-
 6 files changed, 198 insertions(+), 7 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1e596f0..e97c4a5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -28,6 +28,7 @@ Release Notes.
 * Fix kubernetes.client.opeanapi.ApiException.
 * Remove filename suffix in the meter active file config.
 * Introduce log analysis language (LAL).
+* Add `sum` function in meter system.
 
 #### UI
 * Update selector scroller to show in all pages.
diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md
index b1edc70..695a2c1 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -177,16 +177,16 @@ MAL should instruct meter-system how to do downsampling for metrics. It doesn't
 Down sampling operations are as global function in MAL:
 
  - avg
+ - sum
  - latest (TODO)
  - min (TODO)
  - max (TODO)
  - mean (TODO)
- - sum (TODO)
  - count (TODO)
 
 The default one is `avg` if not specific an operation.
 
-If user want get latest time from `last_server_state_sync_time_in_seconds`:
+If user want to get the latest time from `last_server_state_sync_time_in_seconds`:
 
 ```
 latest(last_server_state_sync_time_in_seconds.tagEqual('production', 'catalog'))
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
index 4427dee..587122f 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DownsamplingType.java
@@ -22,5 +22,5 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
  * DownsamplingType indicates the downsampling type of meter function
  */
 public enum DownsamplingType {
-    AVG, LATEST
+    AVG, SUM, LATEST
 }
diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index bb2329c..378137a 100644
--- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -163,6 +163,7 @@ public class SampleFamily {
 
     /* Aggregation operators */
     public SampleFamily sum(List<String> by) {
+        ExpressionParsingContext.get().ifPresent(ctx -> ctx.downsampling = DownsamplingType.SUM);
         return aggregate(by, Double::sum);
     }
 
@@ -175,7 +176,10 @@ public class SampleFamily {
     }
 
     public SampleFamily avg(List<String> by) {
-        ExpressionParsingContext.get().ifPresent(ctx -> ctx.aggregationLabels.addAll(by));
+        ExpressionParsingContext.get().ifPresent(ctx -> {
+            ctx.aggregationLabels.addAll(by);
+            ctx.downsampling = DownsamplingType.AVG;
+        });
         if (this == EMPTY) {
             return EMPTY;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java
new file mode 100644
index 0000000..a0ccbab
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/sum/SumFunction.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function.sum;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
+import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.Entrance;
+import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.SourceFrom;
+import org.apache.skywalking.oap.server.core.query.sql.Function;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+@ToString
+@MeterFunction(functionName = "sum")
+public abstract class SumFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
+    protected static final String VALUE = "value";
+
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID, length = 512)
+    private String entityId;
+
+    @Setter
+    @Getter
+    @Column(columnName = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
+
+    @Getter
+    @Setter
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.COMMON_VALUE, function = Function.Sum)
+    private long value;
+
+    @Entrance
+    public final void combine(@SourceFrom long value) {
+        setValue(this.value + value);
+    }
+
+    @Override
+    public final boolean combine(Metrics metrics) {
+        final SumFunction sumFunc = (SumFunction) metrics;
+        combine(sumFunc.getValue());
+        return true;
+    }
+
+    @Override
+    public final void calculate() {
+    }
+
+    @Override
+    public Metrics toHour() {
+        final SumFunction metrics = (SumFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        final SumFunction metrics = (SumFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setServiceId(getServiceId());
+        metrics.setValue(getValue());
+        return metrics;
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return getEntityId().hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        setValue(remoteData.getDataLongs(0));
+        setTimeBucket(remoteData.getDataLongs(1));
+
+        setEntityId(remoteData.getDataStrings(0));
+        setServiceId(remoteData.getDataStrings(1));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        final RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+
+        remoteBuilder.addDataLongs(getValue());
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(getEntityId());
+        remoteBuilder.addDataStrings(getServiceId());
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + getEntityId();
+    }
+
+    @Override
+    public void accept(final MeterEntity entity, final Long value) {
+        setEntityId(entity.id());
+        setServiceId(entity.serviceId());
+        setValue(getValue() + value);
+    }
+
+    @Override
+    public Class<? extends StorageHashMapBuilder<?>> builder() {
+        return SumStorageBuilder.class;
+    }
+
+    public static class SumStorageBuilder implements StorageHashMapBuilder<SumFunction> {
+        @Override
+        public SumFunction storage2Entity(final Map<String, Object> dbMap) {
+            final SumFunction metrics = new SumFunction() {
+                @Override
+                public AcceptableValue<Long> createNew() {
+                    throw new UnexpectedException("createNew should not be called");
+                }
+            };
+            metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
+            metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
+            metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public Map<String, Object> entity2Storage(final SumFunction storageData) {
+            final Map<String, Object> map = new HashMap<>();
+            map.put(VALUE, storageData.getValue());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
+            map.put(ENTITY_ID, storageData.getEntityId());
+            return map;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof SumFunction)) {
+            return false;
+        }
+        final SumFunction function = (SumFunction) o;
+        return Objects.equals(getEntityId(), function.getEntityId())
+            && Objects.equals(getTimeBucket(), function.getTimeBucket());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(getEntityId(), getTimeBucket());
+    }
+}
diff --git a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
index 23cb4d1..b503f11 100644
--- a/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
+++ b/test/e2e/e2e-test/src/test/java/org/apache/skywalking/e2e/log/LogE2E.java
@@ -153,9 +153,9 @@ public class LogE2E extends SkyWalkingTestAdapter {
 
             LOGGER.info("{}: {}", metricsName, instanceMetrics);
             final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
-            final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
-            greaterThanZero.setValue("gt 0");
-            instanceRespTimeMatcher.setValue(greaterThanZero);
+            final MetricsValueMatcher greaterThanOne = new MetricsValueMatcher();
+            greaterThanOne.setValue("gt 1");
+            instanceRespTimeMatcher.setValue(greaterThanOne);
             instanceRespTimeMatcher.verify(instanceMetrics.getValues());
         }
     }