You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/26 12:22:03 UTC
[skywalking] 01/01: Support histogram function in Meter system.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch new-func
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 82631dc1bfc870d1d547513dabae66ec360d645a
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Apr 26 20:21:02 2020 +0800
Support histogram function in Meter system.
---
.../server/core/analysis/meter/MeterEntity.java | 4 +
.../server/core/analysis/meter/MeterSystem.java | 6 +
.../analysis/meter/function/AcceptableValue.java | 2 +
.../meter/function/{Avg.java => AvgFunction.java} | 29 +++-
.../analysis/meter/function/BucketedValues.java | 61 ++++++++
.../function/{Avg.java => HistogramFunction.java} | 109 +++++++++-----
.../server/core/analysis/metrics/DataTable.java | 25 ++++
.../core/analysis/metrics/HistogramMetrics.java | 8 +-
.../core/analysis/metrics/PercentileMetrics.java | 8 +-
.../meter/function/HistogramFunctionTest.java | 164 +++++++++++++++++++++
.../core/analysis/metrics/DataTableTestCase.java | 10 +-
.../provider/PrometheusFetcherProvider.java | 27 +++-
12 files changed, 392 insertions(+), 61 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
index a006dbe..388e2d7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -59,6 +59,10 @@ public class MeterEntity {
}
}
+ public String serviceId() {
+ return IDManager.ServiceID.buildId(serviceName, true);
+ }
+
/**
* Create a service level meter entity.
*/
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index b8d6e34..2d5173b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -40,6 +40,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -266,6 +267,11 @@ public class MeterSystem implements Service {
* @param acceptableValue should only be created through {@link #create(String, String, ScopeType, Class)}
*/
public void doStreamingCalculation(AcceptableValue acceptableValue) {
+ final long timeBucket = acceptableValue.getTimeBucket();
+ if (timeBucket == 0L) {
+ // Avoid no timestamp data, which could be harmful for the storage.
+ acceptableValue.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ }
MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
index 9bd9cdd..7663a1e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
@@ -38,4 +38,6 @@ public interface AcceptableValue<T> {
Class<? extends StorageBuilder> builder();
void setTimeBucket(long timeBucket);
+
+ long getTimeBucket();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
similarity index 80%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
index e6ed28a..424cbce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
@@ -25,6 +25,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -37,17 +38,25 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
"entityId",
"timeBucket"
})
-public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
+public abstract class AvgFunction extends LongAvgMetrics implements AcceptableValue<Long> {
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
+ /**
+ * Service ID is required for sort query.
+ */
+ @Setter
+ @Getter
+ @Column(columnName = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
@Override
public Metrics toHour() {
- Avg metrics = (Avg) createNew();
+ AvgFunction metrics = (AvgFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
+ metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
@@ -55,9 +64,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
@Override
public Metrics toDay() {
- Avg metrics = (Avg) createNew();
+ AvgFunction metrics = (AvgFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
+ metrics.setServiceId(getServiceId());
metrics.setSummation(getSummation());
metrics.setCount(getCount());
return metrics;
@@ -75,6 +85,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
setTimeBucket(remoteData.getDataLongs(2));
this.entityId = remoteData.getDataStrings(0);
+ this.serviceId = remoteData.getDataStrings(1);
}
@Override
@@ -85,6 +96,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
+ remoteBuilder.addDataStrings(serviceId);
return remoteBuilder;
}
@@ -97,6 +109,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
@Override
public void accept(final MeterEntity entity, final Long value) {
this.entityId = entity.id();
+ this.serviceId = entity.serviceId();
this.summation += value;
this.count += 1;
}
@@ -106,10 +119,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
return AvgStorageBuilder.class;
}
- public static class AvgStorageBuilder implements StorageBuilder<Avg> {
+ public static class AvgStorageBuilder implements StorageBuilder<AvgFunction> {
@Override
- public Avg map2Data(final Map<String, Object> dbMap) {
- Avg metrics = new Avg() {
+ public AvgFunction map2Data(final Map<String, Object> dbMap) {
+ AvgFunction metrics = new AvgFunction() {
@Override
public AcceptableValue<Long> createNew() {
throw new UnexpectedException("createNew should not be called");
@@ -119,17 +132,19 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
- public Map<String, Object> data2Map(final Avg storageData) {
+ public Map<String, Object> data2Map(final AvgFunction storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SUMMATION, storageData.getSummation());
map.put(VALUE, storageData.getValue());
map.put(COUNT, storageData.getCount());
map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java
new file mode 100644
index 0000000..857f2e7
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Arrays;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+
+/**
+ * BucketedValues represents a value set, which elements are grouped by time bucket.
+ */
+@ToString
+@Getter
+public class BucketedValues {
+ /**
+ * The element in the buckets represent the minimal value of this bucket, the max is defined by the next element.
+ * Such as 0, 10, 50, 100 means buckets are [0, 10), [10, 50), [50, 100), [100, infinite+).
+ */
+ private long[] buckets;
+ /**
+ * {@link #buckets} and {@link #values} arrays should have the same length. The element in the values, represents
+ * the amount in the same index bucket.
+ */
+ private long[] values;
+
+ public BucketedValues(final long[] buckets, final long[] values) {
+ if (buckets == null || values == null || buckets.length == 0 || values.length == 0) {
+ throw new IllegalArgumentException("buckets and values can't be null.");
+ }
+ if (buckets.length != values.length) {
+ throw new IllegalArgumentException("The length of buckets and values should be same.");
+ }
+ this.buckets = buckets;
+ this.values = values;
+ }
+
+ /**
+ * @return true if the bucket is same.
+ */
+ public boolean isCompatible(DataTable dataset) {
+ final long[] existedBuckets = dataset.keys().stream().mapToLong(Long::parseLong).sorted().toArray();
+ return Arrays.equals(buckets, existedBuckets);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
similarity index 51%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
index e6ed28a..8dbcc7d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
@@ -23,43 +23,90 @@ import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
-@MeterFunction(functionName = "avg")
+@MeterFunction(functionName = "histogram")
+@Slf4j
@EqualsAndHashCode(of = {
"entityId",
"timeBucket"
})
-public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
+public abstract class HistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
+ public static final String DATASET = "dataset";
+
@Setter
@Getter
@Column(columnName = ENTITY_ID)
private String entityId;
+ @Getter
+ @Setter
+ @Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
+ private DataTable dataset = new DataTable(30);
+ /**
+ * Service ID is required for sort query.
+ */
+ @Setter
+ @Getter
+ @Column(columnName = InstanceTraffic.SERVICE_ID)
+ private String serviceId;
+
+ @Override
+ public void accept(final MeterEntity entity, final BucketedValues value) {
+ if (dataset.size() > 0) {
+ if (!value.isCompatible(dataset)) {
+ throw new IllegalArgumentException(
+ "Incompatible BucketedValues [" + value + "] for current HistogramFunction[" + dataset + "]");
+ }
+ }
+
+ this.entityId = entity.id();
+ this.serviceId = entity.serviceId();
+
+ final long[] values = value.getValues();
+ for (int i = 0; i < values.length; i++) {
+ final long bucket = value.getBuckets()[i];
+ final long bucketValue = values[i];
+ dataset.valueAccumulation(String.valueOf(bucket), bucketValue);
+ }
+ }
+
+ @Override
+ public void combine(final Metrics metrics) {
+ HistogramFunction histogram = (HistogramFunction) metrics;
+ this.dataset.append(histogram.dataset);
+ }
+
+ @Override
+ public void calculate() {
+
+ }
@Override
public Metrics toHour() {
- Avg metrics = (Avg) createNew();
+ HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInHour());
- metrics.setSummation(getSummation());
- metrics.setCount(getCount());
+ metrics.setServiceId(getServiceId());
+ metrics.setDataset(getDataset());
return metrics;
}
@Override
public Metrics toDay() {
- Avg metrics = (Avg) createNew();
+ HistogramFunction metrics = (HistogramFunction) createNew();
metrics.setEntityId(getEntityId());
metrics.setTimeBucket(toTimeBucketInDay());
- metrics.setSummation(getSummation());
- metrics.setCount(getCount());
+ metrics.setServiceId(getServiceId());
+ metrics.setDataset(getDataset());
return metrics;
}
@@ -70,21 +117,23 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
@Override
public void deserialize(final RemoteData remoteData) {
- this.count = remoteData.getDataLongs(0);
- this.summation = remoteData.getDataLongs(1);
- setTimeBucket(remoteData.getDataLongs(2));
+ this.setTimeBucket(remoteData.getDataLongs(0));
+
+ this.setEntityId(remoteData.getDataStrings(0));
+ this.setServiceId(remoteData.getDataStrings(1));
- this.entityId = remoteData.getDataStrings(0);
+ this.setDataset(new DataTable(remoteData.getDataTableStrings(0)));
}
@Override
public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
- remoteBuilder.addDataLongs(count);
- remoteBuilder.addDataLongs(summation);
remoteBuilder.addDataLongs(getTimeBucket());
remoteBuilder.addDataStrings(entityId);
+ remoteBuilder.addDataStrings(serviceId);
+
+ remoteBuilder.addDataTableStrings(dataset.toStorageData());
return remoteBuilder;
}
@@ -95,41 +144,33 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
}
@Override
- public void accept(final MeterEntity entity, final Long value) {
- this.entityId = entity.id();
- this.summation += value;
- this.count += 1;
- }
-
- @Override
public Class<? extends StorageBuilder> builder() {
- return AvgStorageBuilder.class;
+ return HistogramFunctionBuilder.class;
}
- public static class AvgStorageBuilder implements StorageBuilder<Avg> {
+ public static class HistogramFunctionBuilder implements StorageBuilder<HistogramFunction> {
+
@Override
- public Avg map2Data(final Map<String, Object> dbMap) {
- Avg metrics = new Avg() {
+ public HistogramFunction map2Data(final Map<String, Object> dbMap) {
+ HistogramFunction metrics = new HistogramFunction() {
@Override
- public AcceptableValue<Long> createNew() {
+ public AcceptableValue<BucketedValues> createNew() {
throw new UnexpectedException("createNew should not be called");
}
};
- metrics.setSummation(((Number) dbMap.get(SUMMATION)).longValue());
- metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
- metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
+ metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+ metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
metrics.setEntityId((String) dbMap.get(ENTITY_ID));
return metrics;
}
@Override
- public Map<String, Object> data2Map(final Avg storageData) {
+ public Map<String, Object> data2Map(final HistogramFunction storageData) {
Map<String, Object> map = new HashMap<>();
- map.put(SUMMATION, storageData.getSummation());
- map.put(VALUE, storageData.getValue());
- map.put(COUNT, storageData.getCount());
+ map.put(DATASET, storageData.getDataset());
map.put(TIME_BUCKET, storageData.getTimeBucket());
+ map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
map.put(ENTITY_ID, storageData.getEntityId());
return map;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
index 92c7eec..661f8f8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
@@ -21,13 +21,18 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
/**
* DataTable includes a hashmap to store string key and long value. It enhanced the serialization capability.
*/
+@ToString
+@EqualsAndHashCode
public class DataTable implements StorageDataComplexObject<DataTable> {
private HashMap<String, Long> data;
@@ -52,10 +57,30 @@ public class DataTable implements StorageDataComplexObject<DataTable> {
data.put(key, value);
}
+ /**
+ * Accumulate the value with existing value in the same given key.
+ */
+ public void valueAccumulation(String key, Long value) {
+ Long element = data.get(key);
+ if (element == null) {
+ element = value;
+ } else {
+ element += value;
+ }
+ data.put(key, element);
+ }
+
+ /**
+ * @return the sum of all values.
+ */
public long sumOfValues() {
return data.values().stream().mapToLong(element -> element).sum();
}
+ public Set<String> keys() {
+ return data.keySet();
+ }
+
public List<String> sortedKeys(Comparator<String> keyComparator) {
return data.keySet().stream().sorted(keyComparator).collect(Collectors.toList());
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
index d6957bf..76bebfc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
@@ -71,13 +71,7 @@ public abstract class HistogramMetrics extends Metrics {
}
String idx = String.valueOf(index * step);
- Long element = dataset.get(idx);
- if (element == null) {
- element = 1L;
- } else {
- element++;
- }
- dataset.put(idx, element);
+ dataset.valueAccumulation(idx, 1L);
}
@Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
index d14c769..d4e6209 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
@@ -73,13 +73,7 @@ public abstract class PercentileMetrics extends Metrics implements MultiIntValue
this.precision = precision;
String index = String.valueOf(value / precision);
- Long element = dataset.get(index);
- if (element == null) {
- element = 1L;
- } else {
- element++;
- }
- dataset.put(index, element);
+ dataset.valueAccumulation(index, 1L);
}
@Override
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java
new file mode 100644
index 0000000..5b483ec
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.skywalking.oap.server.core.analysis.meter.function.HistogramFunction.DATASET;
+
+public class HistogramFunctionTest {
+ private static final long[] BUCKETS = new long[] {
+ 0,
+ 50,
+ 100,
+ 250
+ };
+
+ private static final long[] BUCKETS_2ND = new long[] {
+ 0,
+ 51,
+ 100,
+ 250
+ };
+
+ @Test
+ public void testFunction() {
+ HistogramFunctionInst inst = new HistogramFunctionInst();
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS, new long[] {
+ 0,
+ 4,
+ 10,
+ 10
+ })
+ );
+
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS, new long[] {
+ 1,
+ 2,
+ 3,
+ 4
+ })
+ );
+
+ final int[] results = inst.getDataset().sortedValues(Comparator.comparingInt(Integer::parseInt)).stream()
+ .flatMapToInt(l -> IntStream.of(l.intValue()))
+ .toArray();
+ Assert.assertArrayEquals(new int[] {
+ 1,
+ 6,
+ 13,
+ 14
+ }, results);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIncompatible() {
+ HistogramFunctionInst inst = new HistogramFunctionInst();
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS, new long[] {
+ 0,
+ 4,
+ 10,
+ 10
+ })
+ );
+
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS_2ND, new long[] {
+ 1,
+ 2,
+ 3,
+ 4
+ })
+ );
+ }
+
+ @Test
+ public void testSerialization() {
+ HistogramFunctionInst inst = new HistogramFunctionInst();
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS, new long[] {
+ 1,
+ 4,
+ 10,
+ 10
+ })
+ );
+
+ final HistogramFunctionInst inst2 = new HistogramFunctionInst();
+ inst2.deserialize(inst.serialize().build());
+
+ Assert.assertEquals(inst, inst2);
+ // HistogramFunction equal doesn't include dataset.
+ Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+ }
+
+ @Test
+ public void testBuilder() throws IllegalAccessException, InstantiationException {
+ HistogramFunctionInst inst = new HistogramFunctionInst();
+ inst.accept(
+ MeterEntity.newService("service-test"),
+ new BucketedValues(
+ BUCKETS, new long[] {
+ 1,
+ 4,
+ 10,
+ 10
+ })
+ );
+
+ final StorageBuilder storageBuilder = inst.builder().newInstance();
+
+ // Simulate the storage layer do, convert the datatable to string.
+ final Map map = storageBuilder.data2Map(inst);
+ map.put(DATASET, ((DataTable) map.get(DATASET)).toStorageData());
+
+ final HistogramFunction inst2 = (HistogramFunction) storageBuilder.map2Data(map);
+ Assert.assertEquals(inst, inst2);
+ // HistogramFunction equal doesn't include dataset.
+ Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+ }
+
+ private static class HistogramFunctionInst extends HistogramFunction {
+
+ @Override
+ public AcceptableValue<BucketedValues> createNew() {
+ return new HistogramFunctionInst();
+ }
+ }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
index 08b80af..ea370f9 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
@@ -28,11 +28,11 @@ public class DataTableTestCase {
@Before
public void init() {
dataTable = new DataTable();
- dataTable.put("5", 500L);
- dataTable.put("6", 600L);
- dataTable.put("1", 100L);
- dataTable.put("2", 200L);
- dataTable.put("7", 700L);
+ dataTable.valueAccumulation("5", 500L);
+ dataTable.valueAccumulation("6", 600L);
+ dataTable.valueAccumulation("1", 100L);
+ dataTable.valueAccumulation("2", 200L);
+ dataTable.valueAccumulation("7", 700L);
}
@Test
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 6d942e1..f4d4065 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -62,6 +63,7 @@ public class PrometheusFetcherProvider extends ModuleProvider {
// We should create it based on metrics configuration.
final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
+ meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
}
}
@@ -78,10 +80,33 @@ public class PrometheusFetcherProvider extends ModuleProvider {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
+ final MeterEntity servEntity = MeterEntity.newService("mock_service");
+
+ // Long Avg Example
final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
- value.accept(MeterEntity.newService("abc"), 5L);
+ value.accept(servEntity, 5L);
value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
service.doStreamingCalculation(value);
+
+ // Histogram Example
+ final AcceptableValue<BucketedValues> histogramMetrics = service.buildMetrics(
+ "test_histogram_metrics", BucketedValues.class);
+ value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+ histogramMetrics.accept(servEntity, new BucketedValues(
+ new long[] {
+ 0,
+ 50,
+ 100,
+ 250
+ },
+ new long[] {
+ 0,
+ 4,
+ 10,
+ 10
+ }
+ ));
+ service.doStreamingCalculation(histogramMetrics);
}
}, 2, 2, TimeUnit.SECONDS);
}