You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/04/28 03:02:30 UTC

[skywalking] branch percentile created (now cbcd8a8)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch percentile
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at cbcd8a8  Support percentile function in the meter system.

This branch includes the following new commits:

     new cbcd8a8  Support percentile function in the meter system.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Support percentile function in the meter system.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch percentile
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit cbcd8a8c23a3544aac3341affb641f2988951382
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Apr 28 11:01:33 2020 +0800

    Support percentile function in the meter system.
---
 .../skywalking/oal/rt/parser/AnalysisResult.java   |   2 +-
 .../oal/rt/parser/PersistenceColumns.java          |  12 +-
 .../skywalking/oal/rt/parser/PersistenceField.java |   4 +-
 .../code-templates/metrics/deserialize.ftl         |   2 +-
 .../server/core/analysis/meter/MeterSystem.java    |  19 +-
 .../analysis/meter/function/HistogramFunction.java |  15 --
 .../meter/function/PercentileFunction.java         | 297 +++++++++++++++++++++
 .../oap/server/core/analysis/metrics/IntList.java  |  87 ++++++
 .../meter/function/PercentileFunctionTest.java     | 208 +++++++++++++++
 .../provider/PrometheusFetcherProvider.java        |  37 +++
 .../elasticsearch/base/ColumnTypeEsMapping.java    |   3 +-
 .../plugin/jdbc/h2/dao/H2TableInstaller.java       |   4 +-
 .../plugin/jdbc/mysql/MySQLTableInstaller.java     |   3 +-
 13 files changed, 658 insertions(+), 35 deletions(-)

diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
index c0cbe70..31a975b 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/AnalysisResult.java
@@ -139,7 +139,7 @@ public class AnalysisResult {
             } else if (columnType.equals(long.class)) {
                 serializeFields.addLongField(column.getFieldName());
             } else if (StorageDataComplexObject.class.isAssignableFrom(columnType)) {
-                serializeFields.addObjectField(column.getFieldName());
+                serializeFields.addObjectField(column.getFieldName(), columnType.getName());
             } else {
                 throw new IllegalStateException(
                     "Unexpected field type [" + columnType.getSimpleName() + "] of persistence column [" + column
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
index 0fab3aa..195e3f8 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceColumns.java
@@ -29,23 +29,23 @@ public class PersistenceColumns {
     private List<PersistenceField> objectFields = new LinkedList<>();
 
     public void addStringField(String fieldName) {
-        stringFields.add(new PersistenceField(fieldName));
+        stringFields.add(new PersistenceField(fieldName, "String"));
     }
 
     public void addLongField(String fieldName) {
-        longFields.add(new PersistenceField(fieldName));
+        longFields.add(new PersistenceField(fieldName, "long"));
     }
 
     public void addDoubleField(String fieldName) {
-        doubleFields.add(new PersistenceField(fieldName));
+        doubleFields.add(new PersistenceField(fieldName, "double"));
     }
 
     public void addIntField(String fieldName) {
-        intFields.add(new PersistenceField(fieldName));
+        intFields.add(new PersistenceField(fieldName, "int"));
     }
 
-    public void addObjectField(String fieldName) {
-        objectFields.add(new PersistenceField(fieldName));
+    public void addObjectField(String fieldName, String fieldType) {
+        objectFields.add(new PersistenceField(fieldName, fieldType));
     }
 
     public List<PersistenceField> getStringFields() {
diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
index 015cebb..622e7e8 100644
--- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
+++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/PersistenceField.java
@@ -29,10 +29,12 @@ public class PersistenceField {
     private String fieldName;
     private String setter;
     private String getter;
+    private String fieldType;
 
-    public PersistenceField(String fieldName) {
+    public PersistenceField(String fieldName, String fieldType) {
         this.fieldName = fieldName;
         this.setter = ClassMethodUtil.toSetMethod(fieldName);
         this.getter = ClassMethodUtil.toGetMethod(fieldName);
+        this.fieldType = fieldType;
     }
 }
diff --git a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
index 2484151..455634f 100644
--- a/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
+++ b/oap-server/oal-rt/src/main/resources/code-templates/metrics/deserialize.ftl
@@ -16,7 +16,7 @@ public void deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto.
 </#list>
 
 <#list serializeFields.objectFields as field>
-    ${field.setter}(new org.apache.skywalking.oap.server.core.analysis.metrics.DataTable(remoteData.getDataObjectStrings(${field?index})));
+    ${field.setter}(new ${field.fieldType}(remoteData.getDataObjectStrings(${field?index})));
 </#list>
 
 }
\ No newline at end of file
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
index 2d5173b..6ba9a47 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/MeterSystem.java
@@ -159,13 +159,18 @@ public class MeterSystem implements Service {
             boolean foundDataType = false;
             String acceptance = null;
             for (final Type genericInterface : meterFunction.getGenericInterfaces()) {
-                ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
-                if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
-                    Type[] arguments = parameterizedType.getActualTypeArguments();
-                    if (arguments[0].equals(dataType)) {
-                        foundDataType = true;
-                    } else {
-                        acceptance = arguments[0].getTypeName();
+                if (genericInterface instanceof ParameterizedType) {
+                    ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
+                    if (parameterizedType.getRawType().getTypeName().equals(AcceptableValue.class.getName())) {
+                        Type[] arguments = parameterizedType.getActualTypeArguments();
+                        if (arguments[0].equals(dataType)) {
+                            foundDataType = true;
+                        } else {
+                            acceptance = arguments[0].getTypeName();
+                        }
+                    }
+                    if (foundDataType) {
+                        break;
                     }
                 }
             }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
index 16f000a..f1150a6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/HistogramFunction.java
@@ -27,7 +27,6 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.Const;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
-import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -58,13 +57,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
     @Setter
     @Column(columnName = DATASET, dataType = Column.ValueDataType.HISTOGRAM, storageOnly = true, defaultValue = 0)
     private DataTable dataset = new DataTable(30);
-    /**
-     * Service ID is required for sort query.
-     */
-    @Setter
-    @Getter
-    @Column(columnName = InstanceTraffic.SERVICE_ID)
-    private String serviceId;
 
     @Override
     public void accept(final MeterEntity entity, final BucketedValues value) {
@@ -76,7 +68,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
         }
 
         this.entityId = entity.id();
-        this.serviceId = entity.serviceId();
 
         final long[] values = value.getValues();
         for (int i = 0; i < values.length; i++) {
@@ -110,7 +101,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
         HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInHour());
-        metrics.setServiceId(getServiceId());
         metrics.setDataset(getDataset());
         return metrics;
     }
@@ -120,7 +110,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
         HistogramFunction metrics = (HistogramFunction) createNew();
         metrics.setEntityId(getEntityId());
         metrics.setTimeBucket(toTimeBucketInDay());
-        metrics.setServiceId(getServiceId());
         metrics.setDataset(getDataset());
         return metrics;
     }
@@ -135,7 +124,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
         this.setTimeBucket(remoteData.getDataLongs(0));
 
         this.setEntityId(remoteData.getDataStrings(0));
-        this.setServiceId(remoteData.getDataStrings(1));
 
         this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
     }
@@ -146,7 +134,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
         remoteBuilder.addDataLongs(getTimeBucket());
 
         remoteBuilder.addDataStrings(entityId);
-        remoteBuilder.addDataStrings(serviceId);
 
         remoteBuilder.addDataObjectStrings(dataset.toStorageData());
 
@@ -175,7 +162,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
             };
             metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
             metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
-            metrics.setServiceId((String) dbMap.get(InstanceTraffic.SERVICE_ID));
             metrics.setEntityId((String) dbMap.get(ENTITY_ID));
             return metrics;
         }
@@ -185,7 +171,6 @@ public abstract class HistogramFunction extends Metrics implements AcceptableVal
             Map<String, Object> map = new HashMap<>();
             map.put(DATASET, storageData.getDataset());
             map.put(TIME_BUCKET, storageData.getTimeBucket());
-            map.put(InstanceTraffic.SERVICE_ID, storageData.getServiceId());
             map.put(ENTITY_ID, storageData.getEntityId());
             return map;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
new file mode 100644
index 0000000..01d767b
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunction.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
+import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
+import org.apache.skywalking.oap.server.core.analysis.metrics.MultiIntValuesHolder;
+import org.apache.skywalking.oap.server.core.analysis.metrics.PercentileMetrics;
+import org.apache.skywalking.oap.server.core.query.type.Bucket;
+import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+
+/**
+ * PercentileFunction is the implementation of {@link PercentileMetrics} in the meter system. The major difference is
+ * the PercentileFunction accepts the {@link PercentileArgument} as input rather than every single request.
+ */
+@MeterFunction(functionName = "percentile")
+@Slf4j
+@EqualsAndHashCode(of = {
+    "entityId",
+    "timeBucket"
+})
+public abstract class PercentileFunction extends Metrics implements AcceptableValue<PercentileFunction.PercentileArgument>, MultiIntValuesHolder {
+    public static final String DATASET = "dataset";
+    public static final String RANKS = "ranks";
+    public static final String VALUE = "value";
+
+    @Setter
+    @Getter
+    @Column(columnName = ENTITY_ID)
+    private String entityId;
+    @Getter
+    @Setter
+    @Column(columnName = VALUE, dataType = Column.ValueDataType.LABELED_VALUE, storageOnly = true)
+    private DataTable percentileValues = new DataTable(10);
+    @Getter
+    @Setter
+    @Column(columnName = DATASET, storageOnly = true)
+    private DataTable dataset = new DataTable(30);
+    /**
+     * Rank
+     */
+    @Getter
+    @Setter
+    @Column(columnName = RANKS, storageOnly = true)
+    private IntList ranks = new IntList(10);
+
+    private boolean isCalculated = false;
+
+    @Override
+    public void accept(final MeterEntity entity, final PercentileArgument value) {
+        if (dataset.size() > 0) {
+            if (!value.getBucketedValues().isCompatible(dataset)) {
+                throw new IllegalArgumentException(
+                    "Incompatible BucketedValues [" + value + "] for current PercentileFunction[" + dataset + "]");
+            }
+        }
+
+        for (final int rank : value.getRanks()) {
+            if (rank <= 0) {
+                throw new IllegalArgumentException("Illegal rank value " + rank + ", must be positive");
+            }
+        }
+
+        if (ranks.size() > 0) {
+            if (ranks.size() != value.getRanks().length) {
+                throw new IllegalArgumentException(
+                    "Incompatible ranks size = [" + value.getRanks().length + "] for current PercentileFunction[" + ranks
+                        .size() + "]");
+            } else {
+                for (final int rank : value.getRanks()) {
+                    if (!ranks.include(rank)) {
+                        throw new IllegalArgumentException(
+                            "Rank " + rank + " doesn't exist in the previous ranks " + ranks);
+                    }
+                }
+            }
+        } else {
+            for (final int rank : value.getRanks()) {
+                ranks.add(rank);
+            }
+        }
+
+        this.entityId = entity.id();
+
+        final long[] values = value.getBucketedValues().getValues();
+        for (int i = 0; i < values.length; i++) {
+            final long bucket = value.getBucketedValues().getBuckets()[i];
+            String bucketName = bucket == Integer.MIN_VALUE ? Bucket.INFINITE_NEGATIVE : String.valueOf(bucket);
+            final long bucketValue = values[i];
+            dataset.valueAccumulation(bucketName, bucketValue);
+        }
+
+        this.isCalculated = false;
+    }
+
+    @Override
+    public void combine(final Metrics metrics) {
+        PercentileFunction percentile = (PercentileFunction) metrics;
+
+        if (!dataset.keysEqual(percentile.getDataset())) {
+            log.warn("Incompatible input [{}}] for current HistogramFunction[{}], entity {}",
+                     percentile, this, entityId
+            );
+            return;
+        }
+        if (ranks.size() > 0) {
+            IntList ranksOfThat = percentile.getRanks();
+            if (this.ranks.size() != ranks.size()) {
+                log.warn("Incompatible ranks size = [{}}] for current PercentileFunction[{}]",
+                         ranks.size(), this.ranks.size()
+                );
+                return;
+            } else {
+                if (!this.ranks.equals(percentile.getRanks())) {
+                    log.warn("Rank {} doesn't exist in the previous ranks {}", percentile.getRanks(), ranks);
+                    return;
+                }
+            }
+        }
+
+        this.dataset.append(percentile.dataset);
+
+        this.isCalculated = false;
+    }
+
+    @Override
+    public void calculate() {
+        if (!isCalculated) {
+            long total = dataset.sumOfValues();
+
+            int[] roofs = new int[ranks.size()];
+            for (int i = 0; i < ranks.size(); i++) {
+                roofs[i] = Math.round(total * ranks.get(i) * 1.0f / 100);
+            }
+
+            int count = 0;
+            final List<String> sortedKeys = dataset.sortedKeys(Comparator.comparingInt(Integer::parseInt));
+
+            int loopIndex = 0;
+
+            for (String key : sortedKeys) {
+                final Long value = dataset.get(key);
+
+                count += value;
+                for (int rankIdx = loopIndex; rankIdx < roofs.length; rankIdx++) {
+                    int roof = roofs[rankIdx];
+
+                    if (count >= roof) {
+                        percentileValues.put(String.valueOf(ranks.get(rankIdx)), Long.parseLong(key));
+                        loopIndex++;
+                    } else {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public Metrics toHour() {
+        PercentileFunction metrics = (PercentileFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInHour());
+        metrics.setDataset(getDataset());
+        metrics.setRanks(getRanks());
+        metrics.setPercentileValues(getPercentileValues());
+        return metrics;
+    }
+
+    @Override
+    public Metrics toDay() {
+        PercentileFunction metrics = (PercentileFunction) createNew();
+        metrics.setEntityId(getEntityId());
+        metrics.setTimeBucket(toTimeBucketInDay());
+        metrics.setDataset(getDataset());
+        metrics.setRanks(getRanks());
+        metrics.setPercentileValues(getPercentileValues());
+        return metrics;
+    }
+
+    @Override
+    public int[] getValues() {
+        return percentileValues.sortedValues(Comparator.comparingInt(Integer::parseInt))
+                               .stream()
+                               .flatMapToInt(l -> IntStream.of(l.intValue()))
+                               .toArray();
+    }
+
+    @Override
+    public int remoteHashCode() {
+        return entityId.hashCode();
+    }
+
+    @Override
+    public void deserialize(final RemoteData remoteData) {
+        this.setTimeBucket(remoteData.getDataLongs(0));
+
+        this.setEntityId(remoteData.getDataStrings(0));
+
+        this.setDataset(new DataTable(remoteData.getDataObjectStrings(0)));
+        this.setRanks(new IntList(remoteData.getDataObjectStrings(1)));
+        this.setPercentileValues(new DataTable(remoteData.getDataObjectStrings(2)));
+    }
+
+    @Override
+    public RemoteData.Builder serialize() {
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
+        remoteBuilder.addDataLongs(getTimeBucket());
+
+        remoteBuilder.addDataStrings(entityId);
+
+        remoteBuilder.addDataObjectStrings(dataset.toStorageData());
+        remoteBuilder.addDataObjectStrings(ranks.toStorageData());
+        remoteBuilder.addDataObjectStrings(percentileValues.toStorageData());
+
+        return remoteBuilder;
+    }
+
+    @Override
+    public String id() {
+        return getTimeBucket() + Const.ID_CONNECTOR + entityId;
+    }
+
+    @Override
+    public Class<? extends StorageBuilder> builder() {
+        return PercentileFunctionBuilder.class;
+    }
+
+    @RequiredArgsConstructor
+    @Getter
+    public static class PercentileArgument {
+        private final BucketedValues bucketedValues;
+        private final int[] ranks;
+    }
+
+    public static class PercentileFunctionBuilder implements StorageBuilder<PercentileFunction> {
+
+        @Override
+        public PercentileFunction map2Data(final Map<String, Object> dbMap) {
+            PercentileFunction metrics = new PercentileFunction() {
+                @Override
+                public AcceptableValue<PercentileArgument> createNew() {
+                    throw new UnexpectedException("createNew should not be called");
+                }
+            };
+            metrics.setDataset(new DataTable((String) dbMap.get(DATASET)));
+            metrics.setRanks(new IntList((String) dbMap.get(RANKS)));
+            metrics.setPercentileValues(new DataTable((String) dbMap.get(VALUE)));
+            metrics.setTimeBucket(((Number) dbMap.get(TIME_BUCKET)).longValue());
+            metrics.setEntityId((String) dbMap.get(ENTITY_ID));
+            return metrics;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(final PercentileFunction storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(DATASET, storageData.getDataset());
+            map.put(RANKS, storageData.getRanks());
+            map.put(VALUE, storageData.getPercentileValues());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            map.put(ENTITY_ID, storageData.getEntityId());
+            return map;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
new file mode 100644
index 0000000..b5bd9ef
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/metrics/IntList.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+
+/**
+ * IntList is a serializable array list carrying int values.
+ */
+@ToString
+@EqualsAndHashCode
+public class IntList implements StorageDataComplexObject<IntList> {
+    private List<Integer> data;
+
+    public IntList(int initialSize) {
+        this.data = new ArrayList(initialSize);
+    }
+
+    public IntList(String valueString) {
+        toObject(valueString);
+    }
+
+    public int size() {
+        return data.size();
+    }
+
+    public boolean include(int value) {
+        return data.contains(value);
+    }
+
+    @Override
+    public String toStorageData() {
+        StringBuilder builder = new StringBuilder();
+
+        this.data.forEach(element -> {
+            if (builder.length() != 0) {
+                // For the first element.
+                builder.append(Const.ARRAY_SPLIT);
+            }
+            builder.append(element);
+        });
+        return builder.toString();
+    }
+
+    @Override
+    public void toObject(final String data) {
+        String[] elements = data.split(Const.ARRAY_PARSER_SPLIT);
+        this.data = new ArrayList<>(elements.length);
+        for (String element : elements) {
+            this.data.add(Integer.parseInt(element));
+        }
+    }
+
+    @Override
+    public void copyFrom(final IntList source) {
+        this.data.addAll(source.data);
+    }
+
+    public void add(final int rank) {
+        this.data.add(rank);
+    }
+
+    public int get(final int idx) {
+        return this.data.get(idx);
+    }
+}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java
new file mode 100644
index 0000000..6409d7b
--- /dev/null
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/meter/function/PercentileFunctionTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.meter.function;
+
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
+import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
+import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PercentileFunctionTest {
+    private static final int[] BUCKETS = new int[] {
+        0,
+        50,
+        100,
+        250
+    };
+
+    private static final int[] BUCKETS_2ND = new int[] {
+        0,
+        51,
+        100,
+        250
+    };
+
+    private static final int[] RANKS = new int[] {
+        50,
+        90
+    };
+
+    @Test
+    public void testFunction() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.calculate();
+        final int[] values = inst.getValues();
+        /**
+         * Expected percentile dataset
+         * <pre>
+         *     0  , 20
+         *     50 , 40
+         *     100, 60 <- P50
+         *     250, 80 <- P90
+         * </pre>
+         */
+        Assert.assertArrayEquals(new int[] {
+            100,
+            250
+        }, values);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIncompatible() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS_2ND,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+    }
+
+    @Test
+    public void testSerialization() {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+
+        PercentileFunctionInst inst2 = new PercentileFunctionInst();
+        inst2.deserialize(inst.serialize().build());
+
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+        Assert.assertEquals(inst.getRanks(), inst2.getRanks());
+        Assert.assertEquals(0, inst2.getPercentileValues().size());
+    }
+
+    @Test
+    public void testBuilder() throws IllegalAccessException, InstantiationException {
+        PercentileFunctionInst inst = new PercentileFunctionInst();
+        inst.accept(
+            MeterEntity.newService("service-test"),
+            new PercentileFunction.PercentileArgument(
+                new BucketedValues(
+                    BUCKETS,
+                    new long[] {
+                        10,
+                        20,
+                        30,
+                        40
+                    }
+                ),
+                RANKS
+            )
+        );
+        inst.calculate();
+
+        final StorageBuilder storageBuilder = inst.builder().newInstance();
+
+        // Simulate the storage layer do, convert the datatable to string.
+        final Map map = storageBuilder.data2Map(inst);
+        map.put(PercentileFunction.DATASET, ((DataTable) map.get(PercentileFunction.DATASET)).toStorageData());
+        map.put(PercentileFunction.VALUE, ((DataTable) map.get(PercentileFunction.VALUE)).toStorageData());
+        map.put(PercentileFunction.RANKS, ((IntList) map.get(PercentileFunction.RANKS)).toStorageData());
+
+        final PercentileFunction inst2 = (PercentileFunction) storageBuilder.map2Data(map);
+        Assert.assertEquals(inst, inst2);
+        // HistogramFunction equal doesn't include dataset.
+        Assert.assertEquals(inst.getDataset(), inst2.getDataset());
+        Assert.assertEquals(inst.getPercentileValues(), inst2.getPercentileValues());
+        Assert.assertEquals(inst.getRanks(), inst2.getRanks());
+    }
+
+    private static class PercentileFunctionInst extends PercentileFunction {
+        @Override
+        public AcceptableValue<PercentileArgument> createNew() {
+            return new PercentileFunctionInst();
+        }
+    }
+}
diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
index 9b5efcd..0e2a00f 100644
--- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
+++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
 import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
+import org.apache.skywalking.oap.server.core.analysis.meter.function.PercentileFunction;
 import org.apache.skywalking.oap.server.fetcher.prometheus.module.PrometheusFetcherModule;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 import org.apache.skywalking.oap.server.library.module.ModuleDefine;
@@ -64,6 +65,10 @@ public class PrometheusFetcherProvider extends ModuleProvider {
             final MeterSystem meterSystem = MeterSystem.meterSystem(getManager());
             meterSystem.create("test_long_metrics", "avg", ScopeType.SERVICE, Long.class);
             meterSystem.create("test_histogram_metrics", "histogram", ScopeType.SERVICE, BucketedValues.class);
+            meterSystem.create(
+                "test_percentile_metrics", "percentile", ScopeType.SERVICE,
+                PercentileFunction.PercentileArgument.class
+            );
         }
     }
 
@@ -109,6 +114,38 @@ public class PrometheusFetcherProvider extends ModuleProvider {
                         }
                     ));
                     service.doStreamingCalculation(histogramMetrics);
+
+                    // Percentile Example
+                    final AcceptableValue<PercentileFunction.PercentileArgument> testPercentileMetrics = service.buildMetrics(
+                        "test_percentile_metrics", PercentileFunction.PercentileArgument.class);
+                    testPercentileMetrics.setTimeBucket(TimeBucket.getMinuteTimeBucket(System.currentTimeMillis()));
+                    testPercentileMetrics.accept(
+                        MeterEntity.newService("service-test"),
+                        new PercentileFunction.PercentileArgument(
+                            new BucketedValues(
+                                // Buckets
+                                new int[] {
+                                    0,
+                                    51,
+                                    100,
+                                    250
+                                },
+                                // Values
+                                new long[] {
+                                    10,
+                                    20,
+                                    30,
+                                    40
+                                }
+                            ),
+                            // Ranks
+                            new int[] {
+                                50,
+                                90
+                            }
+                        )
+                    );
+                    service.doStreamingCalculation(testPercentileMetrics);
                 }
             }, 2, 2, TimeUnit.SECONDS);
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 7eefe65..5509602 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -22,6 +22,7 @@ import com.google.gson.JsonObject;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 
 public class ColumnTypeEsMapping implements DataTypeMapping {
 
@@ -35,7 +36,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
             return "double";
         } else if (String.class.equals(type)) {
             return "keyword";
-        } else if (DataTable.class.equals(type)) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
             return "text";
         } else if (byte[].class.equals(type)) {
             return "binary";
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
index 4d10bf4..53e5266 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TableInstaller.java
@@ -23,12 +23,12 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
-import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.ColumnName;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -98,7 +98,7 @@ public class H2TableInstaller extends ModelInstaller {
             return "DOUBLE";
         } else if (String.class.equals(type)) {
             return "VARCHAR(" + column.getLength() + ")";
-        } else if (DataTable.class.equals(type)) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
             return "VARCHAR(20000)";
         } else if (byte[].class.equals(type)) {
             return "MEDIUMTEXT";
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
index 839fefb..89dc19c 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTableInstaller.java
@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.ExtraQueryIndex;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
@@ -105,7 +106,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
 
     @Override
     protected String getColumnType(final ModelColumn column) {
-        if (DataTable.class.equals(column.getType())) {
+        if (StorageDataComplexObject.class.isAssignableFrom(column.getType())) {
             return "MEDIUMTEXT";
         }
         return super.getColumnType(column);