You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/05 15:02:58 UTC
[iotdb] branch master updated: Add a function quantile in UDF library (#7912)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f58cc6c223 Add a function quantile in UDF library (#7912)
f58cc6c223 is described below
commit f58cc6c223249534abb06a4816e52e38625c80f7
Author: czllgzmzl <32...@users.noreply.github.com>
AuthorDate: Sat Nov 5 23:02:52 2022 +0800
Add a function quantile in UDF library (#7912)
---
docs/UserGuide/UDF-Library/Data-Profiling.md | 73 +++++
docs/zh/UserGuide/UDF-Library/Data-Profiling.md | 74 +++++
library-udf/src/assembly/tools/register-UDF.bat | 1 +
library-udf/src/assembly/tools/register-UDF.sh | 1 +
.../iotdb/library/dprofile/UDAFQuantile.java | 127 ++++++++
.../library/dprofile/util/HeapLongKLLSketch.java | 356 +++++++++++++++++++++
.../dprofile/util/KLLSketchForQuantile.java | 172 ++++++++++
7 files changed, 804 insertions(+)
diff --git a/docs/UserGuide/UDF-Library/Data-Profiling.md b/docs/UserGuide/UDF-Library/Data-Profiling.md
index 7df66d245f..c6c59962c8 100644
--- a/docs/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/UserGuide/UDF-Library/Data-Profiling.md
@@ -894,6 +894,79 @@ Output series:
+-----------------------------+------------------------------------------------------+
```
+## Quantile
+
+### Usage
+
+The function is used to compute the approximate quantile of a numeric time series. A quantile is value of element in the certain rank of the sorted series.
+
+**Name:** QUANTILE
+
+**Input Series:** Only support a single input series. The data type is INT32 / INT64 / FLOAT / DOUBLE.
+
+**Parameter:**
+
++ `rank`: The rank of the quantile. It should be (0,1] and the default value is 0.5. For instance, a quantile with `rank`=0.5 is the median.
++ `K`: The size of KLL sketch maintained in the query. It should be within [100,+inf) and the default value is 800. For instance, the 0.5-quantile computed by a KLL sketch with K=800 items is a value with rank quantile 0.49~0.51 with a confidence of at least 99%. The result will be more accurate as K increases.
+
+**Output Series:** Output a single series. The type is the same as input series. The timestamp of the only data point is 0.
+
+**Note:** Missing points, null points and `NaN` in the input series will be ignored.
+
+### Examples
+
+Input series:
+
+```
++-----------------------------+------------+
+| Time|root.test.s0|
++-----------------------------+------------+
+|2021-03-17T10:32:17.054+08:00| 0.5319929|
+|2021-03-17T10:32:18.054+08:00| 0.9304316|
+|2021-03-17T10:32:19.054+08:00| -1.4800133|
+|2021-03-17T10:32:20.054+08:00| 0.6114087|
+|2021-03-17T10:32:21.054+08:00| 2.5163336|
+|2021-03-17T10:32:22.054+08:00| -1.0845392|
+|2021-03-17T10:32:23.054+08:00| 1.0562582|
+|2021-03-17T10:32:24.054+08:00| 1.3867859|
+|2021-03-17T10:32:25.054+08:00| -0.45429882|
+|2021-03-17T10:32:26.054+08:00| 1.0353678|
+|2021-03-17T10:32:27.054+08:00| 0.7307929|
+|2021-03-17T10:32:28.054+08:00| 2.3167255|
+|2021-03-17T10:32:29.054+08:00| 2.342443|
+|2021-03-17T10:32:30.054+08:00| 1.5809103|
+|2021-03-17T10:32:31.054+08:00| 1.4829416|
+|2021-03-17T10:32:32.054+08:00| 1.5800357|
+|2021-03-17T10:32:33.054+08:00| 0.7124368|
+|2021-03-17T10:32:34.054+08:00| -0.78597564|
+|2021-03-17T10:32:35.054+08:00| 1.2058644|
+|2021-03-17T10:32:36.054+08:00| 1.4215064|
+|2021-03-17T10:32:37.054+08:00| 1.2808295|
+|2021-03-17T10:32:38.054+08:00| -0.6173715|
+|2021-03-17T10:32:39.054+08:00| 0.06644377|
+|2021-03-17T10:32:40.054+08:00| 2.349338|
+|2021-03-17T10:32:41.054+08:00| 1.7335888|
+|2021-03-17T10:32:42.054+08:00| 1.5872132|
+............
+Total line number = 10000
+```
+
+SQL for query:
+
+```sql
+select quantile(s0, "rank"="0.2", "K"="800") from root.test
+```
+
+Output series:
+
+```
++-----------------------------+------------------------------------------------------+
+| Time|quantile(root.test.s0, "rank"="0.2", "K"="800")|
++-----------------------------+------------------------------------------------------+
+|1970-01-01T08:00:00.000+08:00| 0.1801469624042511|
++-----------------------------+------------------------------------------------------+
+```
+
## Period
### Usage
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
index b5657ad6cf..bc7761be39 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Profiling.md
@@ -899,6 +899,80 @@ select percentile(s0, "rank"="0.2", "error"="0.01") from root.test
+-----------------------------+------------------------------------------------------+
```
+## Quantile
+
+### 函数简介
+
+本函数用于计算单列数值型数据的近似分位数。本函数基于KLL sketch算法实现。
+
+**函数名:** QUANTILE
+
+**输入序列:** 仅支持单个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
+
+**参数:**
+
++ `rank`:所求分位数在所有数据中的排名比,取值范围为 (0,1],默认值为 0.5。如当设为 0.5时则计算近似中位数。
++ `K`:允许维护的KLL sketch大小,最小值为100,默认值为800。如`rank`=0.5 且`K`=800,则计算出的分位数的真实排名比有至少99%的可能性在 0.49~0.51之间。
+
+**输出序列:** 输出单个序列,类型与输入序列相同。输出值的时间戳为0。
+
+**提示:** 数据中的空值、缺失值和`NaN`将会被忽略。
+
+### 使用示例
+
+
+输入序列:
+
+```
++-----------------------------+------------+
+| Time|root.test.s0|
++-----------------------------+------------+
+|2021-03-17T10:32:17.054+08:00| 0.5319929|
+|2021-03-17T10:32:18.054+08:00| 0.9304316|
+|2021-03-17T10:32:19.054+08:00| -1.4800133|
+|2021-03-17T10:32:20.054+08:00| 0.6114087|
+|2021-03-17T10:32:21.054+08:00| 2.5163336|
+|2021-03-17T10:32:22.054+08:00| -1.0845392|
+|2021-03-17T10:32:23.054+08:00| 1.0562582|
+|2021-03-17T10:32:24.054+08:00| 1.3867859|
+|2021-03-17T10:32:25.054+08:00| -0.45429882|
+|2021-03-17T10:32:26.054+08:00| 1.0353678|
+|2021-03-17T10:32:27.054+08:00| 0.7307929|
+|2021-03-17T10:32:28.054+08:00| 2.3167255|
+|2021-03-17T10:32:29.054+08:00| 2.342443|
+|2021-03-17T10:32:30.054+08:00| 1.5809103|
+|2021-03-17T10:32:31.054+08:00| 1.4829416|
+|2021-03-17T10:32:32.054+08:00| 1.5800357|
+|2021-03-17T10:32:33.054+08:00| 0.7124368|
+|2021-03-17T10:32:34.054+08:00| -0.78597564|
+|2021-03-17T10:32:35.054+08:00| 1.2058644|
+|2021-03-17T10:32:36.054+08:00| 1.4215064|
+|2021-03-17T10:32:37.054+08:00| 1.2808295|
+|2021-03-17T10:32:38.054+08:00| -0.6173715|
+|2021-03-17T10:32:39.054+08:00| 0.06644377|
+|2021-03-17T10:32:40.054+08:00| 2.349338|
+|2021-03-17T10:32:41.054+08:00| 1.7335888|
+|2021-03-17T10:32:42.054+08:00| 1.5872132|
+............
+Total line number = 10000
+```
+
+用于查询的 SQL 语句:
+
+```sql
+select quantile(s0, "rank"="0.2", "K"="800") from root.test
+```
+
+输出序列:
+
+```
++-----------------------------+------------------------------------------------------+
+| Time|quantile(root.test.s0, "rank"="0.2", "K"="800")|
++-----------------------------+------------------------------------------------------+
+|1970-01-01T08:00:00.000+08:00| 0.1801469624042511|
++-----------------------------+------------------------------------------------------+
+```
+
## Period
### 函数简介
diff --git a/library-udf/src/assembly/tools/register-UDF.bat b/library-udf/src/assembly/tools/register-UDF.bat
index 5a565035c6..164ec8cabd 100644
--- a/library-udf/src/assembly/tools/register-UDF.bat
+++ b/library-udf/src/assembly/tools/register-UDF.bat
@@ -33,6 +33,7 @@ call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "creat
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function median as 'org.apache.iotdb.library.dprofile.UDAFMedian'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function mode as 'org.apache.iotdb.library.dprofile.UDAFMode'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function percentile as 'org.apache.iotdb.library.dprofile.UDAFPercentile'"
+call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function quantile as 'org.apache.iotdb.library.dprofile.UDAFQuantile'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function period as 'org.apache.iotdb.library.dprofile.UDAFPeriod'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function qlb as 'org.apache.iotdb.library.dprofile.UDTFQLB'"
call ../sbin/start-cli.bat -h %host% -p %rpcPort% -u %user% -pw %pass% -e "create function re_sample as 'org.apache.iotdb.library.dprofile.UDTFResample'"
diff --git a/library-udf/src/assembly/tools/register-UDF.sh b/library-udf/src/assembly/tools/register-UDF.sh
index ea7d7758d0..bf6cbdda68 100644
--- a/library-udf/src/assembly/tools/register-UDF.sh
+++ b/library-udf/src/assembly/tools/register-UDF.sh
@@ -35,6 +35,7 @@ pass=root
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function median as 'org.apache.iotdb.library.dprofile.UDAFMedian'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function mode as 'org.apache.iotdb.library.dprofile.UDAFMode'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function percentile as 'org.apache.iotdb.library.dprofile.UDAFPercentile'"
+../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function quantile as 'org.apache.iotdb.library.dprofile.UDAFQuantile'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function period as 'org.apache.iotdb.library.dprofile.UDAFPeriod'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function qlb as 'org.apache.iotdb.library.dprofile.UDTFQLB'"
../sbin/start-cli.sh -h $host -p $rpcPort -u $user -pw $pass -e "create function re_sample as 'org.apache.iotdb.library.dprofile.UDTFResample'"
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java
new file mode 100644
index 0000000000..a806038c83
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/UDAFQuantile.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.dprofile;
+
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+/** calculate the approximate percentile */
+public class UDAFQuantile implements UDTF {
+ private org.apache.iotdb.library.dprofile.util.HeapLongKLLSketch sketch;
+ private double rank;
+ private TSDataType dataType;
+
+ @Override
+ public void validate(UDFParameterValidator validator) throws Exception {
+ validator
+ .validateInputSeriesNumber(1)
+ .validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)
+ .validate(
+ K -> (int) K >= 100,
+ "Size K has to be greater or equal than 100.",
+ validator.getParameters().getIntOrDefault("K", 800))
+ .validate(
+ rank -> (double) rank > 0 && (double) rank <= 1,
+ "rank has to be greater than 0 and less than or equal to 1.",
+ validator.getParameters().getDoubleOrDefault("rank", 0.5));
+ }
+
+ @Override
+ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+ throws Exception {
+ configurations
+ .setAccessStrategy(new RowByRowAccessStrategy())
+ .setOutputDataType(parameters.getDataType(0));
+ dataType = UDFDataTypeTransformer.transformToTsDataType(parameters.getDataType(0));
+ int K = parameters.getIntOrDefault("K", 800);
+ rank = parameters.getDoubleOrDefault("rank", 0.5);
+
+ sketch = new org.apache.iotdb.library.dprofile.util.HeapLongKLLSketch(K * 8);
+ }
+
+ @Override
+ public void transform(Row row, PointCollector collector) throws Exception {
+ double res = Util.getValueAsDouble(row);
+ sketch.update(dataToLong(res));
+ }
+
+ @Override
+ public void terminate(PointCollector collector) throws Exception {
+ long result = sketch.findMinValueWithRank((long) (rank * sketch.getN()));
+ double res = longToResult(result);
+ switch (dataType) {
+ case INT32:
+ collector.putInt(0, (int) res);
+ break;
+ case INT64:
+ collector.putLong(0, (long) res);
+ break;
+ case FLOAT:
+ collector.putFloat(0, (float) res);
+ break;
+ case DOUBLE:
+ collector.putDouble(0, res);
+ }
+ }
+
+ private long dataToLong(Object data) {
+ long result;
+ switch (dataType) {
+ case INT32:
+ return (int) data;
+ case FLOAT:
+ result = Float.floatToIntBits((float) data);
+ return (float) data >= 0f ? result : result ^ Long.MAX_VALUE;
+ case INT64:
+ return (long) data;
+ case DOUBLE:
+ result = Double.doubleToLongBits((double) data);
+ return (double) data >= 0d ? result : result ^ Long.MAX_VALUE;
+ default:
+ return (long) data;
+ }
+ }
+
+ private double longToResult(long result) {
+ switch (dataType) {
+ case INT32:
+ return (double) (result);
+ case FLOAT:
+ result = (result >>> 31) == 0 ? result : result ^ Long.MAX_VALUE;
+ return Float.intBitsToFloat((int) (result));
+ case INT64:
+ return (double) (result);
+ case DOUBLE:
+ result = (result >>> 63) == 0 ? result : result ^ Long.MAX_VALUE;
+ return Double.longBitsToDouble(result);
+ default:
+ return (double) (result);
+ }
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java
new file mode 100644
index 0000000000..39767fc08e
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/HeapLongKLLSketch.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.dprofile.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+// based on KLL Sketch in DataSketch. See
+// https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/kll
+// This is an implementation for long data type.
+// works only in heap memory. don't need to serialize or deserialize
+public class HeapLongKLLSketch extends org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile {
+
+ public HeapLongKLLSketch(int maxMemoryByte) { // maxN=7000 for PAGE, 1.6e6 for CHUNK
+ N = 0;
+ calcParameters(maxMemoryByte);
+ calcLevelMaxSize(1);
+ }
+
+ private void calcParameters(int maxMemoryByte) {
+ maxMemoryNum = calcMaxMemoryNum(maxMemoryByte);
+ num = new long[maxMemoryNum];
+ level0Sorted = false;
+ cntLevel = 0;
+ }
+
+ @Override
+ protected int calcMaxMemoryNum(int maxMemoryByte) {
+ return Math.min(1 << 20, maxMemoryByte / 8);
+ }
+
+ @Override
+ protected void calcLevelMaxSize(int setLevel) { // set cntLevel. cntLevel won't decrease
+ int[] tmpArr = new int[setLevel + 1];
+ int maxPos = cntLevel > 0 ? Math.max(maxMemoryNum, levelPos[cntLevel]) : maxMemoryNum;
+ for (int i = 0; i < setLevel + 1; i++) tmpArr[i] = i < cntLevel ? levelPos[i] : maxPos;
+ levelPos = tmpArr;
+ cntLevel = setLevel;
+ levelMaxSize = new int[cntLevel];
+ int newK = 0;
+ for (int addK = 1 << 28; addK > 0; addK >>>= 1) { // find a new K to fit the memory limit.
+ int need = 0;
+ for (int i = 0; i < cntLevel; i++)
+ need +=
+ Math.max(8, (int) Math.round(((newK + addK) * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ if (need <= maxMemoryNum) newK += addK;
+ }
+ for (int i = 0; i < cntLevel; i++)
+ levelMaxSize[i] = Math.max(8, (int) Math.round((newK * Math.pow(2.0 / 3, cntLevel - i - 1))));
+ // show();
+ }
+
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(N);
+ sb.append(levelMaxSize[cntLevel - 1]);
+ sb.append(cntLevel);
+ sb.append((levelPos[cntLevel] - levelPos[0]));
+ for (int i = 0; i < cntLevel; i++) sb.append(levelPos[i]);
+ return sb.toString();
+ }
+
+ public void showLevelMaxSize() {
+ int numLEN = levelPos[cntLevel] - levelPos[0];
+ System.out.println("\t\t//maxMemNum:" + maxMemoryNum + "\t//N:" + N);
+ for (int i = 0; i < cntLevel; i++) System.out.print("\t\t" + levelMaxSize[i] + "\t");
+ System.out.println();
+ System.out.println("-------------------------------------------------------");
+ }
+
+ private static int mergeSort(
+ long[] a1, int L1, int R1, long[] a2, int L2, int R2, long[] a3, int pos) {
+ if (L1 == R1) System.arraycopy(a2, L2, a3, pos, R2 - L2);
+ else if (L2 == R2) System.arraycopy(a1, L1, a3, pos, R1 - L1);
+ else {
+ int p1 = L1, p2 = L2;
+ while (p1 < R1 || p2 < R2)
+ if (p1 < R1 && (p2 == R2 || a1[p1] < a2[p2])) a3[pos++] = a1[p1++];
+ else a3[pos++] = a2[p2++];
+ }
+ return R1 - L1 + R2 - L2;
+ }
+
+ private void compactOneLevel(int level) { // compact half of data when numToReduce is small
+ if (level == cntLevel - 1) calcLevelMaxSize(cntLevel + 1);
+ int L1 = levelPos[level], R1 = levelPos[level + 1]; // [L,R)
+ // System.out.println("T_T\t"+(R1-L1));
+ if (level == 0 && !level0Sorted) {
+ Arrays.sort(num, L1, R1);
+ level0Sorted = true;
+ }
+ L1 += (R1 - L1) & 1;
+ if (L1 == R1) return;
+
+ randomlyHalveDownToLeft(L1, R1);
+
+ int mid = (L1 + R1) >>> 1;
+ mergeSortWithoutSpace(L1, mid, levelPos[level + 1], levelPos[level + 2]);
+ levelPos[level + 1] = mid;
+ int newP = levelPos[level + 1] - 1, oldP = L1 - 1;
+ for (int i = oldP; i >= levelPos[0]; i--) num[newP--] = num[oldP--];
+
+ levelPos[level] = levelPos[level + 1] - (L1 - levelPos[level]);
+ int numReduced = (R1 - L1) >>> 1;
+ for (int i = level - 1; i >= 0; i--) levelPos[i] += numReduced;
+ }
+
+ @Override
+ public void compact() {
+ int compactLevel = cntLevel - 1;
+ double mxRate = 0;
+ for (int i = 0; i < cntLevel; i++)
+ if (levelPos[i + 1] - levelPos[i] > levelMaxSize[i]) {
+ // double rate = 1.0*(levelPos[i+1]-levelPos[i])/Math.pow(2,i);
+ // if(rate>mxRate&&(i<cntLevel-1 || mxRate==0)) {
+ compactLevel = i;
+ // mxRate=rate;
+ // }
+ // break;
+ }
+
+ compactOneLevel(compactLevel);
+ }
+
+ public void merge(org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another) {
+ // System.out.println("[MERGE]");
+ // show();
+ // another.show();
+ if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel);
+ for (int i = 0; i < another.cntLevel; i++) {
+ int numToMerge = another.levelPos[i + 1] - another.levelPos[i];
+ if (numToMerge == 0) continue;
+ int mergingL = another.levelPos[i];
+ while (numToMerge > 0) {
+ // System.out.println("\t\t"+levelPos[0]);show();showLevelMaxSize();
+ if (levelPos[0] == 0) compact();
+ // if(levelPos[0]==0){
+ // show();
+ // showLevelMaxSize();
+ // }
+ int delta = Math.min(numToMerge, levelPos[0]);
+ if (i > 0) { // move to give space for level i
+ for (int j = 0; j < i; j++) levelPos[j] -= delta;
+ System.arraycopy(num, delta, num, 0, levelPos[i] - delta);
+ }
+ System.arraycopy(another.num, mergingL, num, levelPos[i] - delta, delta);
+ levelPos[i] -= delta;
+ numToMerge -= delta;
+ mergingL += delta;
+ }
+ }
+ this.N += another.N;
+ // System.out.println("[MERGE result]");
+ // show();
+ // System.out.println();
+ }
+
+ public void mergeWithTempSpace(
+ org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another) {
+ // System.out.println("[MERGE]");
+ // show();
+ // another.show();
+ int[] oldLevelPos = levelPos;
+ int oldCntLevel = cntLevel;
+ calcLevelMaxSize(Math.max(cntLevel, another.cntLevel));
+ if (getNumLen() + another.getNumLen() <= maxMemoryNum) {
+ int cntPos = oldLevelPos[0] - another.getNumLen();
+ for (int i = 0; i < cntLevel; i++) {
+ int tmpL = cntPos;
+ // if(i<oldCntLevel)
+ //
+ // System.out.println("\t\t\t\t"+tmpL+"\t\t"+levelPos[i]+"\t"+oldLevelPos[i]+"---"+oldLevelPos[i+1]);
+ // levelPos[i] = tmpL;
+ if (i < oldCntLevel && i < another.cntLevel)
+ cntPos +=
+ mergeSort(
+ num,
+ oldLevelPos[i],
+ oldLevelPos[i + 1],
+ another.num,
+ another.levelPos[i],
+ another.levelPos[i + 1],
+ num,
+ cntPos);
+ else if (i < oldCntLevel)
+ cntPos +=
+ mergeSort(num, oldLevelPos[i], oldLevelPos[i + 1], another.num, 0, 0, num, cntPos);
+ else if (i < another.cntLevel)
+ cntPos +=
+ mergeSort(
+ num,
+ 0,
+ 0,
+ another.num,
+ another.levelPos[i],
+ another.levelPos[i + 1],
+ num,
+ cntPos);
+ levelPos[i] = tmpL;
+ }
+ levelPos[cntLevel] = cntPos;
+ this.N += another.N;
+ } else {
+ long[] oldNum = num;
+ num = new long[getNumLen() + another.getNumLen()];
+ int numLen = 0;
+ for (int i = 0; i < cntLevel; i++) {
+ int tmpL = numLen;
+ if (i < oldCntLevel && i < another.cntLevel)
+ numLen +=
+ mergeSort(
+ oldNum,
+ oldLevelPos[i],
+ oldLevelPos[i + 1],
+ another.num,
+ another.levelPos[i],
+ another.levelPos[i + 1],
+ num,
+ numLen);
+ else if (i < oldCntLevel)
+ numLen +=
+ mergeSort(oldNum, oldLevelPos[i], oldLevelPos[i + 1], another.num, 0, 0, num, numLen);
+ else if (i < another.cntLevel)
+ numLen +=
+ mergeSort(
+ oldNum,
+ 0,
+ 0,
+ another.num,
+ another.levelPos[i],
+ another.levelPos[i + 1],
+ num,
+ numLen);
+ levelPos[i] = tmpL;
+ }
+ levelPos[cntLevel] = numLen;
+ // System.out.println("-------------------------------.............---------");
+ // show();
+ while (getNumLen() > maxMemoryNum) compact();
+ // show();
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ int newPos0 = maxMemoryNum - getNumLen();
+ System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen());
+ for (int i = cntLevel; i >= 0; i--) levelPos[i] = levelPos[i] - levelPos[0] + newPos0;
+ num = oldNum;
+ this.N += another.N;
+ }
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ // System.out.println("\t\t??\t\t"+Arrays.toString(levelPos));
+ // System.out.println("-------------------------------.............---------");
+ // System.out.println("[MERGE result]");
+ // show();
+ // System.out.println();
+ }
+
+ public void mergeWithTempSpace(
+ List<org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile> otherList) {
+ // System.out.println("[MERGE]");
+ // show();
+ //
+ // System.out.println("[mergeWithTempSpace]\t???\t"+num.length+"\t??\t"+cntLevel+"\t??\toldPos0:"+levelPos[0]);
+ // System.out.println("[mergeWithTempSpace]\t???\tmaxMemNum:"+maxMemoryNum);
+ // another.show();
+ int[] oldLevelPos = Arrays.copyOf(levelPos, cntLevel + 1);
+ int oldCntLevel = cntLevel;
+ int otherNumLen = 0;
+ long otherN = 0;
+ // System.out.print("\t\t\t\t[mergeWithTempSpace] others:");
+ for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+ if (another != null) {
+ // System.out.print("\t"+another.getN());
+ if (another.cntLevel > cntLevel) calcLevelMaxSize(another.cntLevel);
+ otherNumLen += another.getNumLen();
+ otherN += another.getN();
+ }
+ // System.out.println();
+ // System.out.println("[mergeWithTempSpace]\totherNumLen:"+otherNumLen);
+ if (getNumLen() + otherNumLen <= maxMemoryNum) {
+ int cntPos = oldLevelPos[0] - otherNumLen;
+ for (int i = 0; i < cntLevel; i++) {
+ levelPos[i] = cntPos;
+ if (i < oldCntLevel) {
+ System.arraycopy(num, oldLevelPos[i], num, cntPos, oldLevelPos[i + 1] - oldLevelPos[i]);
+ cntPos += oldLevelPos[i + 1] - oldLevelPos[i];
+ }
+ for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+ if (another != null && i < another.cntLevel) {
+ System.arraycopy(
+ another.num, another.levelPos[i], num, cntPos, another.getLevelSize(i));
+ cntPos += another.getLevelSize(i);
+ }
+ Arrays.sort(num, levelPos[i], cntPos);
+ // System.out.println("\t\t!!\t"+cntPos);
+ }
+ levelPos[cntLevel] = cntPos;
+ this.N += otherN;
+ } else {
+ long[] oldNum = num;
+ num = new long[getNumLen() + otherNumLen];
+ // System.out.println("\t\t\t\ttmp_num:"+num.length+"
+ // old_num:"+levelPos[0]+"..."+levelPos[oldCntLevel]);
+ int numLen = 0;
+ for (int i = 0; i < cntLevel; i++) {
+ levelPos[i] = numLen;
+ if (i < oldCntLevel) {
+ // System.out.println("\t\t\tlv"+i+"\toldPos:"+oldLevelPos[i]+"\t"+numLen+"
+ // this_level_old_len:"+(oldLevelPos[i + 1] - oldLevelPos[i]));
+ // System.out.println("\t\t\t"+oldNum[oldLevelPos[i + 1]-1]);
+ System.arraycopy(
+ oldNum, oldLevelPos[i], num, numLen, oldLevelPos[i + 1] - oldLevelPos[i]);
+ numLen += oldLevelPos[i + 1] - oldLevelPos[i];
+ }
+ for (org.apache.iotdb.library.dprofile.util.KLLSketchForQuantile another : otherList)
+ if (another != null && i < another.cntLevel) {
+ System.arraycopy(
+ another.num, another.levelPos[i], num, numLen, another.getLevelSize(i));
+ numLen += another.getLevelSize(i);
+ }
+ Arrays.sort(num, levelPos[i], numLen);
+ }
+ levelPos[cntLevel] = numLen;
+ this.N += otherN;
+ // System.out.println("-------------------------------.............---------");
+ // show();System.out.println("\t?\t"+levelPos[0]);
+ while (getNumLen() > maxMemoryNum) compact();
+ // show();System.out.println("\t?\t"+levelPos[0]);
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ int newPos0 = maxMemoryNum - getNumLen();
+ System.arraycopy(num, levelPos[0], oldNum, newPos0, getNumLen());
+ for (int i = cntLevel; i >= 0; i--) levelPos[i] += newPos0 - levelPos[0];
+ num = oldNum;
+ }
+ // System.out.println("\t\t??\t\t"+Arrays.toString(num));
+ // System.out.println("\t\t??\t\t"+Arrays.toString(levelPos));
+ // System.out.println("-------------------------------.............---------");
+ // System.out.println("[MERGE result]");
+ // show();
+ // System.out.println();
+ }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java
new file mode 100644
index 0000000000..ee52016595
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/dprofile/util/KLLSketchForQuantile.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.library.dprofile.util;
+
+import java.util.Arrays;
+import java.util.Random;
+
+// based on KLL Sketch in DataSketch. See
+// https://github.com/apache/datasketches-java/tree/master/src/main/java/org/apache/datasketches/kll
+public abstract class KLLSketchForQuantile {
+ long N;
+ int maxMemoryNum;
+ long[] num;
+ boolean level0Sorted;
+ int cntLevel;
+ int[] levelPos, levelMaxSize;
+ long XORSHIFT = new Random().nextInt(); // 0x2333333319260817L;
+ // Random test_random = new Random();
+
+ public KLLSketchForQuantile() {}
+
+ protected abstract int calcMaxMemoryNum(int maxMemoryByte);
+
+ protected abstract void calcLevelMaxSize(int setLevel);
+
+ public int getLevelSize(int level) {
+ return levelPos[level + 1] - levelPos[level];
+ }
+
+ public void show() {
+ for (int i = 0; i < cntLevel; i++) {
+ System.out.print("\t");
+ System.out.print("[" + (levelPos[i + 1] - levelPos[i]) + "]");
+ System.out.print("\t");
+ }
+ System.out.println();
+ }
+
+ public void showNum() {
+ for (int i = 0; i < cntLevel; i++) {
+ System.out.print("\t|");
+ for (int j = levelPos[i]; j < levelPos[i + 1]; j++) System.out.print(num[j] + ",");
+ System.out.print("|\t");
+ }
+ System.out.println();
+ }
+
+ public void update(long x) { // signed long
+ if (levelPos[0] == 0) compact();
+ num[--levelPos[0]] = x;
+ N++;
+ level0Sorted = false;
+
+ // boolean flag=false;
+ // for(int i=0;i<cntLevel;i++)if(levelPos[i+1]-levelPos[i]>levelMaxSize[i])flag=true;
+ // if(flag)compact();
+ // System.out.println("\t\t\t"+x);
+ }
+
+ protected abstract void compact();
+
+ protected int getNextRand01() { // xor shift *
+ XORSHIFT ^= XORSHIFT >>> 12;
+ XORSHIFT ^= XORSHIFT << 25;
+ XORSHIFT ^= XORSHIFT >>> 27;
+ return (int) ((XORSHIFT * 0x2545F4914F6CDD1DL) & 1);
+ // return test_random.nextInt()&1;
+ }
+
+ protected void randomlyHalveDownToLeft(int L, int R) {
+ int delta = getNextRand01();
+ int mid = (L + R) >>> 1;
+ for (int i = L, j = L; i < mid; i++, j += 2) num[i] = num[j + delta];
+ }
+
+ protected void mergeSortWithoutSpace(int L1, int mid, int L2, int R2) {
+ int p1 = L1, p2 = L2, cntPos = mid;
+ while (p1 < mid || p2 < R2) {
+ if (p1 < mid && (p2 == R2 || num[p1] < num[p2])) num[cntPos++] = num[p1++];
+ else num[cntPos++] = num[p2++];
+ }
+ }
+
+ protected int findRankInLevel(int level, long v) {
+ int L = levelPos[level], R = levelPos[level + 1];
+ if (level == 0 && !level0Sorted) {
+ Arrays.sort(num, L, R);
+ level0Sorted = true;
+ }
+ R--;
+ if (L > R || num[L] >= v) return 0;
+ while (L < R) {
+ int mid = (L + R + 1) >> 1;
+ if (num[mid] < v) L = mid;
+ else R = mid - 1;
+ }
+ return (L - levelPos[level] + 1) * (1 << level);
+ }
+
+ public int getApproxRank(long v) {
+ int approxRank = 0;
+ for (int i = 0; i < cntLevel; i++) {
+ approxRank += findRankInLevel(i, v);
+ // for (int j = levelPos[i]; j < levelPos[i + 1]; j++)
+ // if (num[j] < v) approxRank += 1 << i;
+ }
+ return approxRank;
+ }
+
+ public long findMaxValueWithRank(long K) {
+ long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid;
+ while (L < R) {
+ mid = L + ((R - L) >>> 1);
+ if (mid == L) mid++;
+ if (getApproxRank(mid) <= K) L = mid;
+ else R = mid - 1;
+ }
+ return L;
+ }
+
+ public long findMinValueWithRank(long K) {
+ long L = Long.MIN_VALUE, R = Long.MAX_VALUE, mid;
+ while (L < R) {
+ mid = L + ((R - L) >>> 1);
+ if (mid == R) mid--;
+ if (getApproxRank(mid) >= K) R = mid;
+ else L = mid + 1;
+ }
+ return L;
+ }
+
+ public long getN() {
+ return N;
+ }
+
+ public int getMaxMemoryNum() {
+ return maxMemoryNum;
+ }
+
+ public int getNumLen() {
+ return levelPos[cntLevel] - levelPos[0];
+ }
+
+ public boolean exactResult() {
+ return this.N == this.getNumLen();
+ }
+
+ public long getExactResult(int K) {
+ int L = levelPos[0], R = levelPos[1];
+ if (!level0Sorted) {
+ Arrays.sort(num, L, R);
+ level0Sorted = true;
+ }
+ return num[L + K];
+ }
+}