You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/05 15:02:58 UTC

[iotdb] branch master updated: Add a function quantile in UDF library (#7912)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f58cc6c223 Add a function quantile in UDF library (#7912)
f58cc6c223 is described below

commit f58cc6c223249534abb06a4816e52e38625c80f7
Author: czllgzmzl <32...@users.noreply.github.com>
AuthorDate: Sat Nov 5 23:02:52 2022 +0800

    Add a function quantile in UDF library (#7912)
---
 docs/UserGuide/UDF-Library/Data-Profiling.md       |  73 +++++
 docs/zh/UserGuide/UDF-Library/Data-Profiling.md    |  74 +++++
 library-udf/src/assembly/tools/register-UDF.bat    |   1 +
 library-udf/src/assembly/tools/register-UDF.sh     |   1 +
 .../iotdb/library/dprofile/UDAFQuantile.java       | 127 ++++++++
 .../library/dprofile/util/HeapLongKLLSketch.java   | 356 +++++++++++++++++++++
 .../dprofile/util/KLLSketchForQuantile.java        | 172 ++++++++++
 7 files changed, 804 insertions(+)

diff --git a/docs/UserGuide/UDF-Library/Data-Profiling.md b/docs/UserGuide/UDF-Library/Data-Profiling.md
index 7df66d245f..c6c59962c8 100644
--- a/docs/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/UserGuide/UDF-Library/Data-Profiling.md
@@ -894,6 +894,79 @@ Output series:
 +-----------------------------+------------------------------------------------------+
 ```
 
+## Quantile
+
+### Usage
+
+The function is used to compute the approximate quantile of a numeric time series. A quantile is value of element in the certain rank of the sorted series.
+
+**Name:** QUANTILE
+
+**Input Series:** Only support a single input series. The data type is INT32 / INT64 / FLOAT / DOUBLE.
+
+**Parameter:**
+
++ `rank`: The rank of the quantile. It should be (0,1] and the default value is 0.5. For instance, a quantile with `rank`=0.5 is the median.
++ `K`: The size of KLL sketch maintained in the query. It should be within [100,+inf) and the default value is 800. For instance, the 0.5-quantile computed by a KLL sketch with K=800 items is a value with rank quantile 0.49~0.51 with a confidence of at least 99%. The result will be more accurate as K increases.
+
+**Output Series:** Output a single series. The type is the same as input series. The timestamp of the only data point is 0.
+
+**Note:** Missing points, null points and `NaN` in the input series will be ignored.
+
+### Examples
+
+Input series:
+
+```
++-----------------------------+------------+
+|                         Time|root.test.s0|
++-----------------------------+------------+
+|2021-03-17T10:32:17.054+08:00|   0.5319929|
+|2021-03-17T10:32:18.054+08:00|   0.9304316|
+|2021-03-17T10:32:19.054+08:00|  -1.4800133|
+|2021-03-17T10:32:20.054+08:00|   0.6114087|
+|2021-03-17T10:32:21.054+08:00|   2.5163336|
+|2021-03-17T10:32:22.054+08:00|  -1.0845392|
+|2021-03-17T10:32:23.054+08:00|   1.0562582|
+|2021-03-17T10:32:24.054+08:00|   1.3867859|
+|2021-03-17T10:32:25.054+08:00| -0.45429882|
+|2021-03-17T10:32:26.054+08:00|   1.0353678|
+|2021-03-17T10:32:27.054+08:00|   0.7307929|
+|2021-03-17T10:32:28.054+08:00|   2.3167255|
+|2021-03-17T10:32:29.054+08:00|    2.342443|
+|2021-03-17T10:32:30.054+08:00|   1.5809103|
+|2021-03-17T10:32:31.054+08:00|   1.4829416|
+|2021-03-17T10:32:32.054+08:00|   1.5800357|
+|2021-03-17T10:32:33.054+08:00|   0.7124368|
+|2021-03-17T10:32:34.054+08:00| -0.78597564|
+|2021-03-17T10:32:35.054+08:00|   1.2058644|
+|2021-03-17T10:32:36.054+08:00|   1.4215064|
+|2021-03-17T10:32:37.054+08:00|   1.2808295|
+|2021-03-17T10:32:38.054+08:00|  -0.6173715|
+|2021-03-17T10:32:39.054+08:00|  0.06644377|
+|2021-03-17T10:32:40.054+08:00|    2.349338|
+|2021-03-17T10:32:41.054+08:00|   1.7335888|
+|2021-03-17T10:32:42.054+08:00|   1.5872132|
+............
+Total line number = 10000
+```
+
+SQL for query:
+
+```sql
+select quantile(s0, "rank"="0.2", "K"="800") from root.test
+```
+
+Output series:
+
+```
++-----------------------------+------------------------------------------------------+
+|                         Time|quantile(root.test.s0, "rank"="0.2", "K"="800")|
++-----------------------------+------------------------------------------------------+
+|1970-01-01T08:00:00.000+08:00|                                    0.1801469624042511|
++-----------------------------+------------------------------------------------------+
+```
+
 ## Period
 
 ### Usage
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
index b5657ad6cf..bc7761be39 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
@@ -899,6 +899,80 @@ select percentile(s0, "rank"="0.2", "error"="0.01") from root.test
 +-----------------------------+------------------------------------------------------+
 ```
 
+## Quantile
+
+### 函数简介
+
+本函数用于计算单列数值型数据的近似分位数。本函数基于KLL sketch算法实现。
+
+**函数名:** QUANTILE
+
+**输入序列:** 仅支持单个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
+
+**参数:**
+
++ `rank`:所求分位数在所有数据中的排名比,取值范围为 (0,1],默认值为 0.5。如当设为 0.5时则计算近似中位数。
++ `K`:允许维护的KLL sketch大小,最小值为100,默认值为800。如`rank`=0.5 且`K`=800,则计算出的分位数的真实排名比有至少99%的可能性在 0.49~0.51之间。
+
+**输出序列:** 输出单个序列,类型与输入序列相同。输出值的时间戳为0。
+
+**提示:** 数据中的空值、缺失值和`NaN`将会被忽略。
+
+### 使用示例
+
+
+输入序列:
+
+```
++-----------------------------+------------+
+|                         Time|root.test.s0|
++-----------------------------+------------+
+|2021-03-17T10:32:17.054+08:00|   0.5319929|
+|2021-03-17T10:32:18.054+08:00|   0.9304316|
+|2021-03-17T10:32:19.054+08:00|  -1.4800133|
+|2021-03-17T10:32:20.054+08:00|   0.6114087|
+|2021-03-17T10:32:21.054+08:00|   2.5163336|
+|2021-03-17T10:32:22.054+08:00|  -1.0845392|
+|2021-03-17T10:32:23.054+08:00|   1.0562582|
+|2021-03-17T10:32:24.054+08:00|   1.3867859|
+|2021-03-17T10:32:25.054+08:00| -0.45429882|
+|2021-03-17T10:32:26.054+08:00|   1.0353678|
+|2021-03-17T10:32:27.054+08:00|   0.7307929|
+|2021-03-17T10:32:28.054+08:00|   2.3167255|
+|2021-03-17T10:32:29.054+08:00|    2.342443|
+|2021-03-17T10:32:30.054+08:00|   1.5809103|
+|2021-03-17T10:32:31.054+08:00|   1.4829416|
+|2021-03-17T10:32:32.054+08:00|   1.5800357|
+|2021-03-17T10:32:33.054+08:00|   0.7124368|
+|2021-03-17T10:32:34.054+08:00| -0.78597564|
+|2021-03-17T10:32:35.054+08:00|   1.2058644|
+|2021-03-17T10:32:36.054+08:00|   1.4215064|
+|2021-03-17T10:32:37.054+08:00|   1.2808295|
+|2021-03-17T10:32:38.054+08:00|  -0.6173715|
+|2021-03-17T10:32:39.054+08:00|  0.06644377|
+|2021-03-17T10:32:40.054+08:00|    2.349338|
+|2021-03-17T10:32:41.054+08:00|   1.7335888|
+|2021-03-17T10:32:42.054+08:00|   1.5872132|
+............
+Total line number = 10000
+```
+
+用于查询的 SQL 语句:
+
+```sql
+select quantile(s0, "rank"="0.2", "K"="800") from root.test
+```
+
+输出序列:
+
+```
++-----------------------------+------------------------------------------------------+
+|                         Time|quantile(root.test.s0, "rank"="0.2", "K"="800")|
++-----------------------------+------------------------------------------------------+
+|1970-01-01T08:00:00.000+08:00|                                    0.1801469624042511|
++-----------------------------+------------------------------------------------------+
+```
+
 ## Period
 
 ### 函数简介
diff --git a/library-udf/src/assembly/tools/register-UDF.bat b/library-udf/src/assembly/tools/register-UDF.bat
index 5a565035c6..164ec8cabd 100644
--- a/library-udf/src/assembly/tools/register-UDF.bat
+++ b/library-udf/src/assembly/tools/register-UDF.bat
@@ -33,6 +33,7 @@ call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "creat
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function median as 'org.apache.iotdb.library.dprofile.UDAFMedian'"
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function mode as 'org.apache.iotdb.library.dprofile.UDAFMode'"
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function percentile as 'org.apache.iotdb.library.dprofile.UDAFPercentile'"
+call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function quantile as 'org.apache.iotdb.library.dprofile.UDAFQuantile'"
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function period as 'org.apache.iotdb.library.dprofile.UDAFPeriod'"
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function qlb as 'org.apache.iotdb.library.dprofile.UDTFQLB'"
 call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function re_sample as 'org.apache.iotdb.library.dprofile.UDTFResample'"
diff --git a/library-udf/src/assembly/tools/register-UDF.sh b/library-udf/src/assembly/tools/register-UDF.sh
index ea7d7758d0..bf6cbdda68 100644
--- a/library-udf/src/assembly/tools/register-UDF.sh
+++ b/library-udf/src/assembly/tools/register-UDF.sh
@@ -35,6 +35,7 @@ pass=root
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function median as 'org.apache.iotdb.library.dprofile.UDAFMedian'"
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function mode as 'org.apache.iotdb.library.dprofile.UDAFMode'"
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function percentile as 'org.apache.iotdb.library.dprofile.UDAFPercentile'"
+../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function quantile as 'org.apache.iotdb.library.dprofile.UDAFQuantile'"
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function period as 'org.apache.iotdb.library.dprofile.UDAFPeriod'"
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function qlb as 'org.apache.iotdb.library.dprofile.UDTFQLB'"
 ../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function re_sample as 'org.apache.iotdb.library.dprofile.UDTFResample'"
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java
new file mode 100644
index 0000000000..a806038c83
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dprofile;
+
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** calculate the approximate percentile */
+public class UDAFQuantile implements UDTF {
+  private org.apache.iotdb.library.dprofile.util.HeapLongKLLSketch sketch;
+  private double rank;
+  private TSDataType dataType;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validateInputSeriesNumber(1)
+        .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)
+        .validate(
+            K -> (int) K >= 100,
+            "Size K has to be greater or equal than 100.",
+            validator.getParameters().getIntOrDefault("K", 800))
+        .validate(
+            rank -> (double) rank > 0 && (double) rank <= 1,
+            "rank has to be greater than 0 and less than or equal to 1.",
+            validator.getParameters().getDoubleOrDefault("rank", 0.5));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(parameters.getDataType(0));
+    dataType = UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(0));
+    int K = parameters.getIntOrDefault("K", 800);
+    rank = parameters.getDoubleOrDefault("rank", 0.5);
+
+    sketch = new org.apache.iotdb.library.dprofile.util.HeapLongKLLSketch(K * 8);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    double res = Util.getValueAsDouble(row);
+    sketch.update(dataToLong(res));
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    long result = sketch.findMinValueWithRank((long) (rank * sketch.getN()));
+    double res = longToResult(result);
+    switch (dataType) {
+      case INT32:
+        collector.putInt(0, (int) res);
+        break;
+      case INT64:
+        collector.putLong(0, (long) res);
+        break;
+      case FLOAT:
+        collector.putFloat(0, (float) res);
+        break;
+      case DOUBLE:
+        collector.putDouble(0, res);
+    }
+  }
+
+  private long dataToLong(Object data) {
+    long result;
+    switch (dataType) {
+      case INT32:
+        return (int) data;
+      case FLOAT:
+        result = Float.floatToIntBits((float) data);
+        return (float) data >= 0f ? result : result ^ Long.MAX_VALUE;
+      case INT64:
+        return (long) data;
+      case DOUBLE:
+        result = Double.doubleToLongBits((double) data);
+        return (double) data >= 0d ? result : result ^ Long.MAX_VALUE;
+      default:
+        return (long) data;
+    }
+  }
+
+  private double longToResult(long result) {
+    switch (dataType) {
+      case INT32:
+        return (double) (result);
+      case FLOAT:
+        result = (result >>> 31) == 0 ? result : result ^ Long.MAX_VALUE;
+        return Float.intBitsToFloat((int) (result));
+      case INT64:
+        return (double) (result);
+      case DOUBLE:
+        result = (result >>> 63) == 0 ? result : result ^ Long.MAX_VALUE;
+        return Double.longBitsToDouble(result);
+      default:
+        return (double) (result);
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java
new file mode 100644
index 0000000000..39767fc08e
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.dprofile.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+// based on KLL Sketch in DataSketch. See
+// https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/kll
+// This is an implementation for long data type.
+// works only in heap memory. don't need to serialize or deserialize
+public class HeapLongKLLSketch extends org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile {
+
+  public HeapLongKLLSketch(int maxMemoryByte) { // maxN=7000 for PAGE, 1.6e6 for CHUNK
+    N = 0;
+    calcParameters(maxMemoryByte);
+    calcLevelMaxSize(1);
+  }
+
+  private void calcParameters(int maxMemoryByte) {
+    maxMemoryNum = calcMaxMemoryNum(maxMemoryByte);
+    num = new long[maxMemoryNum];
+    level0Sorted = false;
+    cntLevel = 0;
+  }
+
+  @Override
+  protected int calcMaxMemoryNum(int maxMemoryByte) {
+    return Math.min(1 << 20, maxMemoryByte / 8);
+  }
+
+  @Override
+  protected void calcLevelMaxSize(int setLevel) { // set cntLevel.  cntLevel won't decrease
+    int[] tmpArr = new int[setLevel + 1];
+    int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum;
+    for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos;
+    levelPos = tmpArr;
+    cntLevel = setLevel;
+    levelMaxSize = new int[cntLevel];
+    int newK = 0;
+    for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit.
+      int need = 0;
+      for (int i = 0; i < cntLevel; i++)
+        need +=
+            Math.max(8, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1))));
+      if (need <= maxMemoryNum) newK += addK;
+    }
+    for (int i = 0; i < cntLevel; i++)
+      levelMaxSize[i] = Math.max(8, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1))));
+    //    show();
+  }
+
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append(N);
+    sb.append(levelMaxSize[cntLevel - 1]);
+    sb.append(cntLevel);
+    sb.append((levelPos[cntLevel] - levelPos[0]));
+    for (int i = 0; i < cntLevel; i++) sb.append(levelPos[i]);
+    return sb.toString();
+  }
+
+  public void showLevelMaxSize() {
+    int numLEN = levelPos[cntLevel] - levelPos[0];
+    System.out.println("\t\t//maxMemNum:" + maxMemoryNum + "\t//N:" + N);
+    for (int i = 0; i < cntLevel; i++) System.out.print("\t\t" + levelMaxSize[i] + "\t");
+    System.out.println();
+    System.out.println("-------------------------------------------------------");
+  }
+
+  private static int mergeSort(
+      long[] a1, int L1, int R1, long[] a2, int L2, int R2, long[] a3, int pos) {
+    if (L1 == R1) System.arraycopy(a2, L2, a3, pos, R2 - L2);
+    else if (L2 == R2) System.arraycopy(a1, L1, a3, pos, R1 - L1);
+    else {
+      int p1 = L1, p2 = L2;
+      while (p1 < R1 || p2 < R2)
+        if (p1 < R1 && (p2 == R2 || a1[p1] < a2[p2])) a3[pos++] = a1[p1++];
+        else a3[pos++] = a2[p2++];
+    }
+    return R1 - L1 + R2 - L2;
+  }
+
+  private void compactOneLevel(int level) { // compact half of data when numToReduce is small
+    if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1);
+    int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R)
+    //    System.out.println("T_T\t"+(R1-L1));
+    if (level == 0 && !level0Sorted) {
+      Arrays.sort(num, L1, R1);
+      level0Sorted = true;
+    }
+    L1 += (R1 - L1) & 1;
+    if (L1 == R1) return;
+
+    randomlyHalveDownToLeft(L1, R1);
+
+    int mid = (L1 + R1) >>> 1;
+    mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]);
+    levelPos[level + 1] = mid;
+    int newP = levelPos[level + 1] - 1, oldP = L1 - 1;
+    for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--];
+
+    levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]);
+    int numReduced = (R1 - L1) >>> 1;
+    for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced;
+  }
+
+  @Override
+  public void compact() {
+    int compactLevel = cntLevel - 1;
+    double mxRate = 0;
+    for (int i = 0; i < cntLevel; i++)
+      if (levelPos[i + 1] - levelPos[i] > levelMaxSize[i]) {
+        //      double rate = 1.0*(levelPos[i+1]-levelPos[i])/Math.pow(2,i);
+        //      if(rate>mxRate&&(i<cntLevel-1 || mxRate==0)) {
+        compactLevel = i;
+        //        mxRate=rate;
+        //      }
+        //      break;
+      }
+
+    compactOneLevel(compactLevel);
+  }
+
+  public void merge(org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another) {
+    //    System.out.println("[MERGE]");
+    //    show();
+    //    another.show();
+    if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel);
+    for (int i = 0; i < another.cntLevel; i++) {
+      int numToMerge = another.levelPos[i + 1] - another.levelPos[i];
+      if (numToMerge == 0) continue;
+      int mergingL = another.levelPos[i];
+      while (numToMerge > 0) {
+        //        System.out.println("\t\t"+levelPos[0]);show();showLevelMaxSize();
+        if (levelPos[0] == 0) compact();
+        //        if(levelPos[0]==0){
+        //          show();
+        //          showLevelMaxSize();
+        //        }
+        int delta = Math.min(numToMerge, levelPos[0]);
+        if (i > 0) { // move to give space for level i
+          for (int j = 0; j < i; j++) levelPos[j] -= delta;
+          System.arraycopy(num, delta, num, 0, levelPos[i] - delta);
+        }
+        System.arraycopy(another.num, mergingL, num, levelPos[i] - delta, delta);
+        levelPos[i] -= delta;
+        numToMerge -= delta;
+        mergingL += delta;
+      }
+    }
+    this.N += another.N;
+    //    System.out.println("[MERGE result]");
+    //    show();
+    //    System.out.println();
+  }
+
+  public void mergeWithTempSpace(
+      org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another) {
+    //    System.out.println("[MERGE]");
+    //    show();
+    //    another.show();
+    int[] oldLevelPos = levelPos;
+    int oldCntLevel = cntLevel;
+    calcLevelMaxSize(Math.max(cntLevel, another.cntLevel));
+    if (getNumLen() + another.getNumLen() <= maxMemoryNum) {
+      int cntPos = oldLevelPos[0] - another.getNumLen();
+      for (int i = 0; i < cntLevel; i++) {
+        int tmpL = cntPos;
+        //        if(i<oldCntLevel)
+        //
+        // System.out.println("\t\t\t\t"+tmpL+"\t\t"+levelPos[i]+"\t"+oldLevelPos[i]+"---"+oldLevelPos[i+1]);
+        //        levelPos[i] = tmpL;
+        if (i < oldCntLevel && i < another.cntLevel)
+          cntPos +=
+              mergeSort(
+                  num,
+                  oldLevelPos[i],
+                  oldLevelPos[i + 1],
+                  another.num,
+                  another.levelPos[i],
+                  another.levelPos[i + 1],
+                  num,
+                  cntPos);
+        else if (i < oldCntLevel)
+          cntPos +=
+              mergeSort(num, oldLevelPos[i], oldLevelPos[i + 1], another.num, 0, 0, num, cntPos);
+        else if (i < another.cntLevel)
+          cntPos +=
+              mergeSort(
+                  num,
+                  0,
+                  0,
+                  another.num,
+                  another.levelPos[i],
+                  another.levelPos[i + 1],
+                  num,
+                  cntPos);
+        levelPos[i] = tmpL;
+      }
+      levelPos[cntLevel] = cntPos;
+      this.N += another.N;
+    } else {
+      long[] oldNum = num;
+      num = new long[getNumLen() + another.getNumLen()];
+      int numLen = 0;
+      for (int i = 0; i < cntLevel; i++) {
+        int tmpL = numLen;
+        if (i < oldCntLevel && i < another.cntLevel)
+          numLen +=
+              mergeSort(
+                  oldNum,
+                  oldLevelPos[i],
+                  oldLevelPos[i + 1],
+                  another.num,
+                  another.levelPos[i],
+                  another.levelPos[i + 1],
+                  num,
+                  numLen);
+        else if (i < oldCntLevel)
+          numLen +=
+              mergeSort(oldNum, oldLevelPos[i], oldLevelPos[i + 1], another.num, 0, 0, num, numLen);
+        else if (i < another.cntLevel)
+          numLen +=
+              mergeSort(
+                  oldNum,
+                  0,
+                  0,
+                  another.num,
+                  another.levelPos[i],
+                  another.levelPos[i + 1],
+                  num,
+                  numLen);
+        levelPos[i] = tmpL;
+      }
+      levelPos[cntLevel] = numLen;
+      //    System.out.println("-------------------------------.............---------");
+      //    show();
+      while (getNumLen() > maxMemoryNum) compact();
+      //    show();
+      //    System.out.println("\t\t??\t\t"+Arrays.toString(num));
+      int newPos0 = maxMemoryNum - getNumLen();
+      System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen());
+      for (int i = cntLevel; i >= 0; i--) levelPos[i] = levelPos[i] - levelPos[0] + newPos0;
+      num = oldNum;
+      this.N += another.N;
+    }
+    //    System.out.println("\t\t??\t\t"+Arrays.toString(num));
+    //    System.out.println("\t\t??\t\t"+Arrays.toString(levelPos));
+    //    System.out.println("-------------------------------.............---------");
+    //    System.out.println("[MERGE result]");
+    //    show();
+    //    System.out.println();
+  }
+
+  public void mergeWithTempSpace(
+      List<org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile> otherList) {
+    //    System.out.println("[MERGE]");
+    //    show();
+    //
+    // System.out.println("[mergeWithTempSpace]\t???\t"+num.length+"\t??\t"+cntLevel+"\t??\toldPos0:"+levelPos[0]);
+    //    System.out.println("[mergeWithTempSpace]\t???\tmaxMemNum:"+maxMemoryNum);
+    //    another.show();
+    int[] oldLevelPos = Arrays.copyOf(levelPos, cntLevel + 1);
+    int oldCntLevel = cntLevel;
+    int otherNumLen = 0;
+    long otherN = 0;
+    //    System.out.print("\t\t\t\t[mergeWithTempSpace] others:");
+    for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+      if (another != null) {
+        //      System.out.print("\t"+another.getN());
+        if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel);
+        otherNumLen += another.getNumLen();
+        otherN += another.getN();
+      }
+    //    System.out.println();
+    //    System.out.println("[mergeWithTempSpace]\totherNumLen:"+otherNumLen);
+    if (getNumLen() + otherNumLen <= maxMemoryNum) {
+      int cntPos = oldLevelPos[0] - otherNumLen;
+      for (int i = 0; i < cntLevel; i++) {
+        levelPos[i] = cntPos;
+        if (i < oldCntLevel) {
+          System.arraycopy(num, oldLevelPos[i], num, cntPos, oldLevelPos[i + 1] - oldLevelPos[i]);
+          cntPos += oldLevelPos[i + 1] - oldLevelPos[i];
+        }
+        for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+          if (another != null && i < another.cntLevel) {
+            System.arraycopy(
+                another.num, another.levelPos[i], num, cntPos, another.getLevelSize(i));
+            cntPos += another.getLevelSize(i);
+          }
+        Arrays.sort(num, levelPos[i], cntPos);
+        //        System.out.println("\t\t!!\t"+cntPos);
+      }
+      levelPos[cntLevel] = cntPos;
+      this.N += otherN;
+    } else {
+      long[] oldNum = num;
+      num = new long[getNumLen() + otherNumLen];
+      //      System.out.println("\t\t\t\ttmp_num:"+num.length+"
+      // old_num:"+levelPos[0]+"..."+levelPos[oldCntLevel]);
+      int numLen = 0;
+      for (int i = 0; i < cntLevel; i++) {
+        levelPos[i] = numLen;
+        if (i < oldCntLevel) {
+          //          System.out.println("\t\t\tlv"+i+"\toldPos:"+oldLevelPos[i]+"\t"+numLen+"
+          // this_level_old_len:"+(oldLevelPos[i + 1] - oldLevelPos[i]));
+          //          System.out.println("\t\t\t"+oldNum[oldLevelPos[i + 1]-1]);
+          System.arraycopy(
+              oldNum, oldLevelPos[i], num, numLen, oldLevelPos[i + 1] - oldLevelPos[i]);
+          numLen += oldLevelPos[i + 1] - oldLevelPos[i];
+        }
+        for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+          if (another != null && i < another.cntLevel) {
+            System.arraycopy(
+                another.num, another.levelPos[i], num, numLen, another.getLevelSize(i));
+            numLen += another.getLevelSize(i);
+          }
+        Arrays.sort(num, levelPos[i], numLen);
+      }
+      levelPos[cntLevel] = numLen;
+      this.N += otherN;
+      //    System.out.println("-------------------------------.............---------");
+      //      show();System.out.println("\t?\t"+levelPos[0]);
+      while (getNumLen() > maxMemoryNum) compact();
+      //      show();System.out.println("\t?\t"+levelPos[0]);
+      //    System.out.println("\t\t??\t\t"+Arrays.toString(num));
+      int newPos0 = maxMemoryNum - getNumLen();
+      System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen());
+      for (int i = cntLevel; i >= 0; i--) levelPos[i] += newPos0 - levelPos[0];
+      num = oldNum;
+    }
+    //    System.out.println("\t\t??\t\t"+Arrays.toString(num));
+    //    System.out.println("\t\t??\t\t"+Arrays.toString(levelPos));
+    //    System.out.println("-------------------------------.............---------");
+    //    System.out.println("[MERGE result]");
+    //    show();
+    //    System.out.println();
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java
new file mode 100644
index 0000000000..ee52016595
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.dprofile.util;
+
+import java.util.Arrays;
+import java.util.Random;
+
+// based on KLL Sketch in DataSketch. See
+// https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/kll
+public abstract class KLLSketchForQuantile {
+  long N;
+  int maxMemoryNum;
+  long[] num;
+  boolean level0Sorted;
+  int cntLevel;
+  int[] levelPos, levelMaxSize;
+  long XORSHIFT = new Random().nextInt(); // 0x2333333319260817L;
+  //  Random test_random = new Random();
+
+  public KLLSketchForQuantile() {}
+
+  protected abstract int calcMaxMemoryNum(int maxMemoryByte);
+
+  protected abstract void calcLevelMaxSize(int setLevel);
+
+  public int getLevelSize(int level) {
+    return levelPos[level + 1] - levelPos[level];
+  }
+
+  public void show() {
+    for (int i = 0; i < cntLevel; i++) {
+      System.out.print("\t");
+      System.out.print("[" + (levelPos[i + 1] - levelPos[i]) + "]");
+      System.out.print("\t");
+    }
+    System.out.println();
+  }
+
+  public void showNum() {
+    for (int i = 0; i < cntLevel; i++) {
+      System.out.print("\t|");
+      for (int j = levelPos[i]; j < levelPos[i + 1]; j++) System.out.print(num[j] + ",");
+      System.out.print("|\t");
+    }
+    System.out.println();
+  }
+
+  public void update(long x) { // signed long
+    if (levelPos[0] == 0) compact();
+    num[--levelPos[0]] = x;
+    N++;
+    level0Sorted = false;
+
+    //    boolean flag=false;
+    //    for(int i=0;i<cntLevel;i++)if(levelPos[i+1]-levelPos[i]>levelMaxSize[i])flag=true;
+    //    if(flag)compact();
+    //    System.out.println("\t\t\t"+x);
+  }
+
+  protected abstract void compact();
+
+  protected int getNextRand01() { // xor shift *
+    XORSHIFT ^= XORSHIFT >>> 12;
+    XORSHIFT ^= XORSHIFT << 25;
+    XORSHIFT ^= XORSHIFT >>> 27;
+    return (int) ((XORSHIFT * 0x2545F4914F6CDD1DL) & 1);
+    //    return test_random.nextInt()&1;
+  }
+
+  protected void randomlyHalveDownToLeft(int L, int R) {
+    int delta = getNextRand01();
+    int mid = (L + R) >>> 1;
+    for (int i = L, j = L; i < mid; i++, j += 2) num[i] = num[j + delta];
+  }
+
+  protected void mergeSortWithoutSpace(int L1, int mid, int L2, int R2) {
+    int p1 = L1, p2 = L2, cntPos = mid;
+    while (p1 < mid || p2 < R2) {
+      if (p1 < mid && (p2 == R2 || num[p1] < num[p2])) num[cntPos++] = num[p1++];
+      else num[cntPos++] = num[p2++];
+    }
+  }
+
+  protected int findRankInLevel(int level, long v) {
+    int L = levelPos[level], R = levelPos[level + 1];
+    if (level == 0 && !level0Sorted) {
+      Arrays.sort(num, L, R);
+      level0Sorted = true;
+    }
+    R--;
+    if (L > R || num[L] >= v) return 0;
+    while (L < R) {
+      int mid = (L + R + 1) >> 1;
+      if (num[mid] < v) L = mid;
+      else R = mid - 1;
+    }
+    return (L - levelPos[level] + 1) * (1 << level);
+  }
+
+  public int getApproxRank(long v) {
+    int approxRank = 0;
+    for (int i = 0; i < cntLevel; i++) {
+      approxRank += findRankInLevel(i, v);
+      //      for (int j = levelPos[i]; j < levelPos[i + 1]; j++)
+      //        if (num[j] < v) approxRank += 1 << i;
+    }
+    return approxRank;
+  }
+
+  public long findMaxValueWithRank(long K) {
+    long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid;
+    while (L < R) {
+      mid = L + ((R - L) >>> 1);
+      if (mid == L) mid++;
+      if (getApproxRank(mid) <= K) L = mid;
+      else R = mid - 1;
+    }
+    return L;
+  }
+
+  public long findMinValueWithRank(long K) {
+    long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid;
+    while (L < R) {
+      mid = L + ((R - L) >>> 1);
+      if (mid == R) mid--;
+      if (getApproxRank(mid) >= K) R = mid;
+      else L = mid + 1;
+    }
+    return L;
+  }
+
+  public long getN() {
+    return N;
+  }
+
+  public int getMaxMemoryNum() {
+    return maxMemoryNum;
+  }
+
+  public int getNumLen() {
+    return levelPos[cntLevel] - levelPos[0];
+  }
+
+  public boolean exactResult() {
+    return this.N == this.getNumLen();
+  }
+
+  public long getExactResult(int K) {
+    int L = levelPos[0], R = levelPos[1];
+    if (!level0Sorted) {
+      Arrays.sort(num, L, R);
+      level0Sorted = true;
+    }
+    return num[L + K];
+  }
+}