You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/26 12:22:03 UTC

[skywalking] 01/01: Support histogram function in Meter system.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch new-func
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 82631dc1bfc870d1d547513dabae66ec360d645a
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Apr 26 20:21:02 2020 +0800

    Support histogram function in Meter system.
---
 .../server/core/analysis/meter/MeterEntity.java    |   4 +
 .../server/core/analysis/meter/MeterSystem.java    |   6 +
 .../analysis/meter/function/AcceptableValue.java   |   2 +
 .../meter/function/{Avg.java => AvgFunction.java}  |  29 +++-
 .../analysis/meter/function/BucketedValues.java    |  61 ++++++++
 .../function/{Avg.java => HistogramFunction.java}  | 109 +++++++++-----
 .../server/core/analysis/metrics/DataTable.java    |  25 ++++
 .../core/analysis/metrics/HistogramMetrics.java    |   8 +-
 .../core/analysis/metrics/PercentileMetrics.java   |   8 +-
 .../meter/function/HistogramFunctionTest.java      | 164 +++++++++++++++++++++
 .../core/analysis/metrics/DataTableTestCase.java   |  10 +-
 .../provider/PrometheusFetcherProvider.java        |  27 +++-
 12 files changed, 392 insertions(+), 61 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
index a006dbe..388e2d7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterEntity.java
@@ -59,6 +59,10 @@ public class MeterEntity {
         }
     }
 
+    public String serviceId() {
+        return IDManager.ServiceID.buildId(serviceName, true);
+    }
+
     /**
      * Create a service level meter entity.
      */
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index b8d6e34..2d5173b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -40,6 +40,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -266,6 +267,11 @@ public class MeterSystem implements Service {
      * @param acceptableValue should only be created through {@link #create(String, String, ScopeType, Class)}
      */
     public void doStreamingCalculation(AcceptableValue acceptableValue) {
+        final long timeBucket = acceptableValue.getTimeBucket();
+        if (timeBucket == 0L) {
+            // Avoid no timestamp data, which could be harmful for the storage.
+            acceptableValue.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+        }
         MetricsStreamProcessor.getInstance().in((Metrics) acceptableValue);
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
index 9bd9cdd..7663a1e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AcceptableValue.java
@@ -38,4 +38,6 @@ public interface AcceptableValue<T> {
     Class<? extends StorageBuilder> builder();
 
     void setTimeBucket(long timeBucket);
+
+    long getTimeBucket();
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
similarity index 80%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
index e6ed28a..424cbce 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/AvgFunction.java
@@ -25,6 +25,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -37,17 +38,25 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
     "entityId",
     "timeBucket"
 })
-public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
+public abstract class AvgFunction extends LongAvgMetrics implements AcceptableValue<Long> {
     @Setter
     @Getter
     @Column(columnName = ENTITY_ID)
     private String entityId;
+    /**
+     * Service ID is required for sort query.
+     */
+    @Setter
+    @Getter
+    @Column(columnName = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
 
     @Override
     public Metrics toHour() {
-        Avg metrics = (Avg) createNew();
+        AvgFunction metrics = (AvgFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setServiceId(getServiceId());
         metrics.setSummation(getSummation());
         metrics.setCount(getCount());
         return metrics;
@@ -55,9 +64,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
 
     @Override
     public Metrics toDay() {
-        Avg metrics = (Avg) createNew();
+        AvgFunction metrics = (AvgFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setServiceId(getServiceId());
         metrics.setSummation(getSummation());
         metrics.setCount(getCount());
         return metrics;
@@ -75,6 +85,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
         setTimeBucket(remoteData.getDataLongs(2));
 
         this.entityId = remoteData.getDataStrings(0);
+        this.serviceId = remoteData.getDataStrings(1);
     }
 
     @Override
@@ -85,6 +96,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
         remoteBuilder.addDataLongs(getTimeBucket());
 
         remoteBuilder.addDataStrings(entityId);
+        remoteBuilder.addDataStrings(serviceId);
 
         return remoteBuilder;
     }
@@ -97,6 +109,7 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
     @Override
     public void accept(final MeterEntity entity, final Long value) {
         this.entityId = entity.id();
+        this.serviceId = entity.serviceId();
         this.summation += value;
         this.count += 1;
     }
@@ -106,10 +119,10 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
         return AvgStorageBuilder.class;
     }
 
-    public static class AvgStorageBuilder implements StorageBuilder<Avg> {
+    public static class AvgStorageBuilder implements StorageBuilder<AvgFunction> {
         @Override
-        public Avg map2Data(final Map<String, Object> dbMap) {
-            Avg metrics = new Avg() {
+        public AvgFunction map2Data(final Map<String, Object> dbMap) {
+            AvgFunction metrics = new AvgFunction() {
                 @Override
                 public AcceptableValue<Long> createNew() {
                     throw new UnexpectedException("createNew should not be called");
@@ -119,17 +132,19 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
             metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
             metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
             metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
             metrics.setEntityId((String) dbMap.get(ENTITY_ID));
             return metrics;
         }
 
         @Override
-        public Map<String, Object> data2Map(final Avg storageData) {
+        public Map<String, Object> data2Map(final AvgFunction storageData) {
             Map<String, Object> map = new HashMap<>();
             map.put(SUMMATION, storageData.getSummation());
             map.put(VALUE, storageData.getValue());
             map.put(COUNT, storageData.getCount());
             map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
             map.put(ENTITY_ID, storageData.getEntityId());
             return map;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java
new file mode 100644
index 0000000..857f2e7
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/BucketedValues.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Arrays;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+
+/**
+ * BucketedValues represents a value set, which elements are grouped by time bucket.
+ */
+@ToString
+@Getter
+public class BucketedValues {
+    /**
+     * The element in the buckets represent the minimal value of this bucket, the max is defined by the next element.
+     * Such as 0, 10, 50, 100 means buckets are [0, 10), [10, 50), [50, 100), [100, infinite+).
+     */
+    private long[] buckets;
+    /**
+     * {@link #buckets} and {@link #values} arrays should have the same length. The element in the values, represents
+     * the amount in the same index bucket.
+     */
+    private long[] values;
+
+    public BucketedValues(final long[] buckets, final long[] values) {
+        if (buckets == null || values == null || buckets.length == 0 || values.length == 0) {
+            throw new IllegalArgumentException("buckets and values can't be null.");
+        }
+        if (buckets.length != values.length) {
+            throw new IllegalArgumentException("The length of buckets and values should be same.");
+        }
+        this.buckets = buckets;
+        this.values = values;
+    }
+
+    /**
+     * @return true if the bucket is same.
+     */
+    public boolean isCompatible(DataTable dataset) {
+        final long[] existedBuckets = dataset.keys().stream().mapToLong(Long::parseLong).sorted().toArray();
+        return Arrays.equals(buckets, existedBuckets);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
similarity index 51%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
index e6ed28a..8dbcc7d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/Avg.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
@@ -23,43 +23,90 @@ import java.util.Map;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
-import org.apache.skywalking.oap.server.core.analysis.metrics.LongAvgMetrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
 import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
 import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
-@MeterFunction(functionName = "avg")
+@MeterFunction(functionName = "histogram")
+@Slf4j
 @EqualsAndHashCode(of = {
     "entityId",
     "timeBucket"
 })
-public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long> {
+public abstract class HistogramFunction extends Metrics implements AcceptableValue<BucketedValues> {
+    public static final String DATASET = "dataset";
+
     @Setter
     @Getter
     @Column(columnName = ENTITY_ID)
     private String entityId;
+    @Getter
+    @Setter
+    @Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
+    private DataTable dataset = new DataTable(30);
+    /**
+     * Service ID is required for sort query.
+     */
+    @Setter
+    @Getter
+    @Column(columnName = InstanceTraffic.SERVICE_ID)
+    private String serviceId;
+
+    @Override
+    public void accept(final MeterEntity entity, final BucketedValues value) {
+        if (dataset.size() > 0) {
+            if (!value.isCompatible(dataset)) {
+                throw new IllegalArgumentException(
+                    "Incompatible BucketedValues [" + value + "] for current HistogramFunction[" + dataset + "]");
+            }
+        }
+
+        this.entityId = entity.id();
+        this.serviceId = entity.serviceId();
+
+        final long[] values = value.getValues();
+        for (int i = 0; i < values.length; i++) {
+            final long bucket = value.getBuckets()[i];
+            final long bucketValue = values[i];
+            dataset.valueAccumulation(String.valueOf(bucket), bucketValue);
+        }
+    }
+
+    @Override
+    public void combine(final Metrics metrics) {
+        HistogramFunction histogram = (HistogramFunction) metrics;
+        this.dataset.append(histogram.dataset);
+    }
+
+    @Override
+    public void calculate() {
+
+    }
 
     @Override
     public Metrics toHour() {
-        Avg metrics = (Avg) createNew();
+        HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInHour());
-        metrics.setSummation(getSummation());
-        metrics.setCount(getCount());
+        metrics.setServiceId(getServiceId());
+        metrics.setDataset(getDataset());
         return metrics;
     }
 
     @Override
     public Metrics toDay() {
-        Avg metrics = (Avg) createNew();
+        HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInDay());
-        metrics.setSummation(getSummation());
-        metrics.setCount(getCount());
+        metrics.setServiceId(getServiceId());
+        metrics.setDataset(getDataset());
         return metrics;
     }
 
@@ -70,21 +117,23 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
 
     @Override
     public void deserialize(final RemoteData remoteData) {
-        this.count = remoteData.getDataLongs(0);
-        this.summation = remoteData.getDataLongs(1);
-        setTimeBucket(remoteData.getDataLongs(2));
+        this.setTimeBucket(remoteData.getDataLongs(0));
+
+        this.setEntityId(remoteData.getDataStrings(0));
+        this.setServiceId(remoteData.getDataStrings(1));
 
-        this.entityId = remoteData.getDataStrings(0);
+        this.setDataset(new DataTable(remoteData.getDataTableStrings(0)));
     }
 
     @Override
     public RemoteData.Builder serialize() {
         RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
-        remoteBuilder.addDataLongs(count);
-        remoteBuilder.addDataLongs(summation);
         remoteBuilder.addDataLongs(getTimeBucket());
 
         remoteBuilder.addDataStrings(entityId);
+        remoteBuilder.addDataStrings(serviceId);
+
+        remoteBuilder.addDataTableStrings(dataset.toStorageData());
 
         return remoteBuilder;
     }
@@ -95,41 +144,33 @@ public abstract class Avg extends LongAvgMetrics implements AcceptableValue<Long
     }
 
     @Override
-    public void accept(final MeterEntity entity, final Long value) {
-        this.entityId = entity.id();
-        this.summation += value;
-        this.count += 1;
-    }
-
-    @Override
     public Class<? extends StorageBuilder> builder() {
-        return AvgStorageBuilder.class;
+        return HistogramFunctionBuilder.class;
     }
 
-    public static class AvgStorageBuilder implements StorageBuilder<Avg> {
+    public static class HistogramFunctionBuilder implements StorageBuilder<HistogramFunction> {
+
         @Override
-        public Avg map2Data(final Map<String, Object> dbMap) {
-            Avg metrics = new Avg() {
+        public HistogramFunction map2Data(final Map<String, Object> dbMap) {
+            HistogramFunction metrics = new HistogramFunction() {
                 @Override
-                public AcceptableValue<Long> createNew() {
+                public AcceptableValue<BucketedValues> createNew() {
                     throw new UnexpectedException("createNew should not be called");
                 }
             };
-            metrics.setSummation(((Number) dbMap.get(SUMMATION)).longValue());
-            metrics.setValue(((Number) dbMap.get(VALUE)).longValue());
-            metrics.setCount(((Number) dbMap.get(COUNT)).longValue());
+            metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
             metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
             metrics.setEntityId((String) dbMap.get(ENTITY_ID));
             return metrics;
         }
 
         @Override
-        public Map<String, Object> data2Map(final Avg storageData) {
+        public Map<String, Object> data2Map(final HistogramFunction storageData) {
             Map<String, Object> map = new HashMap<>();
-            map.put(SUMMATION, storageData.getSummation());
-            map.put(VALUE, storageData.getValue());
-            map.put(COUNT, storageData.getCount());
+            map.put(DATASET, storageData.getDataset());
             map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
             map.put(ENTITY_ID, storageData.getEntityId());
             return map;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
index 92c7eec..661f8f8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTable.java
@@ -21,13 +21,18 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 
 /**
  * DataTable includes a hashmap to store string key and long value. It enhanced the serialization capability.
  */
+@ToString
+@EqualsAndHashCode
 public class DataTable implements StorageDataComplexObject<DataTable> {
     private HashMap<String, Long> data;
 
@@ -52,10 +57,30 @@ public class DataTable implements StorageDataComplexObject<DataTable> {
         data.put(key, value);
     }
 
+    /**
+     * Accumulate the value with existing value in the same given key.
+     */
+    public void valueAccumulation(String key, Long value) {
+        Long element = data.get(key);
+        if (element == null) {
+            element = value;
+        } else {
+            element += value;
+        }
+        data.put(key, element);
+    }
+
+    /**
+     * @return the sum of all values.
+     */
     public long sumOfValues() {
         return data.values().stream().mapToLong(element -> element).sum();
     }
 
+    public Set<String> keys() {
+        return data.keySet();
+    }
+
     public List<String> sortedKeys(Comparator<String> keyComparator) {
         return data.keySet().stream().sorted(keyComparator).collect(Collectors.toList());
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
index d6957bf..76bebfc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/HistogramMetrics.java
@@ -71,13 +71,7 @@ public abstract class HistogramMetrics extends Metrics {
         }
         String idx = String.valueOf(index * step);
 
-        Long element = dataset.get(idx);
-        if (element == null) {
-            element = 1L;
-        } else {
-            element++;
-        }
-        dataset.put(idx, element);
+        dataset.valueAccumulation(idx, 1L);
     }
 
     @Override
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
index d14c769..d4e6209 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/PercentileMetrics.java
@@ -73,13 +73,7 @@ public abstract class PercentileMetrics extends Metrics implements MultiIntValue
         this.precision = precision;
 
         String index = String.valueOf(value / precision);
-        Long element = dataset.get(index);
-        if (element == null) {
-            element = 1L;
-        } else {
-            element++;
-        }
-        dataset.put(index, element);
+        dataset.valueAccumulation(index, 1L);
     }
 
     @Override
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java
new file mode 100644
index 0000000..5b483ec
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunctionTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.skywalking.oap.server.core.analysis.meter.function.HistogramFunction.DATASET;
+
+public class HistogramFunctionTest {
+    private static final long[] BUCKETS = new long[] {
+        0,
+        50,
+        100,
+        250
+    };
+
+    private static final long[] BUCKETS_2ND = new long[] {
+        0,
+        51,
+        100,
+        250
+    };
+
+    @Test
+    public void testFunction() {
+        HistogramFunctionInst inst = new HistogramFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS, new long[] {
+                0,
+                4,
+                10,
+                10
+            })
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS, new long[] {
+                1,
+                2,
+                3,
+                4
+            })
+        );
+
+        final int[] results = inst.getDataset().sortedValues(Comparator.comparingInt(Integer::parseInt)).stream()
+                                  .flatMapToInt(l -> IntStream.of(l.intValue()))
+                                  .toArray();
+        Assert.assertArrayEquals(new int[] {
+            1,
+            6,
+            13,
+            14
+        }, results);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIncompatible() {
+        HistogramFunctionInst inst = new HistogramFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS, new long[] {
+                0,
+                4,
+                10,
+                10
+            })
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS_2ND, new long[] {
+                1,
+                2,
+                3,
+                4
+            })
+        );
+    }
+
+    @Test
+    public void testSerialization() {
+        HistogramFunctionInst inst = new HistogramFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS, new long[] {
+                1,
+                4,
+                10,
+                10
+            })
+        );
+
+        final HistogramFunctionInst inst2 = new HistogramFunctionInst();
+        inst2.deserialize(inst.serialize().build());
+
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, InstantiationException {
+        HistogramFunctionInst inst = new HistogramFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new BucketedValues(
+                BUCKETS, new long[] {
+                1,
+                4,
+                10,
+                10
+            })
+        );
+
+        final StorageBuilder storageBuilder = inst.builder().newInstance();
+
+        // Simulate the storage layer do, convert the datatable to string.
+        final Map map = storageBuilder.data2Map(inst);
+        map.put(DATASET, ((DataTable) map.get(DATASET)).toStorageData());
+
+        final HistogramFunction inst2 = (HistogramFunction) storageBuilder.map2Data(map);
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+    }
+
+    private static class HistogramFunctionInst extends HistogramFunction {
+
+        @Override
+        public AcceptableValue<BucketedValues> createNew() {
+            return new HistogramFunctionInst();
+        }
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
index 08b80af..ea370f9 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/metrics/DataTableTestCase.java
@@ -28,11 +28,11 @@ public class DataTableTestCase {
     @Before
     public void init() {
         dataTable = new DataTable();
-        dataTable.put("5", 500L);
-        dataTable.put("6", 600L);
-        dataTable.put("1", 100L);
-        dataTable.put("2", 200L);
-        dataTable.put("7", 700L);
+        dataTable.valueAccumulation("5", 500L);
+        dataTable.valueAccumulation("6", 600L);
+        dataTable.valueAccumulation("1", 100L);
+        dataTable.valueAccumulation("2", 200L);
+        dataTable.valueAccumulation("7", 700L);
     }
 
     @Test
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 6d942e1..f4d4065 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
 import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -62,6 +63,7 @@ public class PrometheusFetcherProvider extends ModuleProvider {
             // We should create it based on metrics configuration.
             final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
             meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
+            meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
         }
     }
 
@@ -78,10 +80,33 @@ public class PrometheusFetcherProvider extends ModuleProvider {
             Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
+                    final MeterEntity servEntity = MeterEntity.newService("mock_service");
+
+                    // Long Avg Example
                     final AcceptableValue<Long> value = service.buildMetrics("test_long_metrics", Long.class);
-                    value.accept(MeterEntity.newService("abc"), 5L);
+                    value.accept(servEntity, 5L);
                     value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
                     service.doStreamingCalculation(value);
+
+                    // Histogram Example
+                    final AcceptableValue<BucketedValues> histogramMetrics = service.buildMetrics(
+                        "test_histogram_metrics", BucketedValues.class);
+                    value.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+                    histogramMetrics.accept(servEntity, new BucketedValues(
+                        new long[] {
+                            0,
+                            50,
+                            100,
+                            250
+                        },
+                        new long[] {
+                            0,
+                            4,
+                            10,
+                            10
+                        }
+                    ));
+                    service.doStreamingCalculation(histogramMetrics);
                 }
             }, 2, 2, TimeUnit.SECONDS);
         }