You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/30 06:23:27 UTC

[iotdb] branch master updated: [IoTDB-2991] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE (#5682)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e7812282f [IoTDB-2991] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE (#5682)
2e7812282f is described below

commit 2e7812282fce2153be8c2be340478d7563303226
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Sat Apr 30 14:23:22 2022 +0800

    [IoTDB-2991] Equal size bucket sampling UDFs: EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE (#5682)
---
 docs/UserGuide/Query-Data/Select-Expression.md     |  74 ++
 docs/zh/UserGuide/Query-Data/Select-Expression.md  |  76 ++
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 138 ++++
 .../db/query/udf/builtin/BuiltinFunction.java      |   4 +-
 .../udf/builtin/UDTFEqualSizeBucketM4Sample.java   |   8 +-
 .../builtin/UDTFEqualSizeBucketOutlierSample.java  | 834 +++++++++++++++++++++
 6 files changed, 1129 insertions(+), 5 deletions(-)

diff --git a/docs/UserGuide/Query-Data/Select-Expression.md b/docs/UserGuide/Query-Data/Select-Expression.md
index e3cf02c00b..c8f9807617 100644
--- a/docs/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/UserGuide/Query-Data/Select-Expression.md
@@ -671,6 +671,80 @@ Result:
 Total line number = 12
 It costs 0.065s
 ```
+#### Equal Size Bucket Outlier Sample
+This function samples the input sequence with equal number of bucket outliers, that is, according to the downsampling ratio given by the user and the number of samples in the bucket, the input sequence is divided into several buckets according to a fixed number of points. Sampling by the given outlier sampling method within each bucket.
+
+| Function Name | Allowed Input Series Data Types | Required Attributes                           | Output Series Data Type | Series Data Type  Description                 |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE | INT32 / INT64 / FLOAT / DOUBLE | The value range of `proportion` is `(0, 1]`, the default is `0.1`</br> The value of `type` is `avg` or `stendis` or `cos` or `prenextdis`, the default is `avg` </br>The value of `number` should be greater than 0, the default is `3`| INT32 / INT64 / FLOAT / DOUBLE | Returns outlier samples in equal buckets that match the sampling ratio and the number of samples in the bucket |
+
+Parameter Description
+- `proportion`: sampling ratio
+- `number`: the number of samples in each bucket, default `3`
+- `type`: outlier sampling method, the value is
+  - `avg`: Take the average of the data points in the bucket, and find the `top number` farthest from the average according to the sampling ratio
+  - `stendis`: Take the vertical distance between each data point in the bucket and the first and last data points of the bucket to form a straight line, and according to the sampling ratio, find the `top number` with the largest distance
+  - `cos`: Set a data point in the bucket as b, the data point on the left of b as a, and the data point on the right of b as c, then take the cosine value of the angle between the ab and bc vectors. The larger the angle, the more likely it is an outlier. Find the `top number` with the smallest cos value
+  - `prenextdis`: Let a data point in the bucket be b, the data point to the left of b is a, and the data point to the right of b is c, then take the sum of the lengths of ab and bc as the yardstick, the larger the sum, the more likely it is to be an outlier, and find the `top number` with the largest sum value
+
+##### Demonstrate
+Example data: `root.ln.wf01.wt01.temperature` has a total of `100` ordered data from `0.0-99.0`. Among them, in order to add outliers, we make the number modulo 5 equal to 0 increment by 100.
+```
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                        105.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|1970-01-01T08:00:00.013+08:00|                         13.0|
+|1970-01-01T08:00:00.014+08:00|                         14.0|
+|1970-01-01T08:00:00.015+08:00|                        115.0|
+|1970-01-01T08:00:00.016+08:00|                         16.0|
+|.............................|.............................|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                        195.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+Sql:
+```sql
+select equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='avg', 'number'='2') as outlier_avg_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='stendis', 'number'='2') as outlier_stendis_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='cos', 'number'='2') as outlier_cos_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='prenextdis', 'number'='2') as outlier_prenextdis_sam [...]
+```
+Result:
+```
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|                         Time|outlier_avg_sample|outlier_stendis_sample|outlier_cos_sample|outlier_prenextdis_sample|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|1970-01-01T08:00:00.005+08:00|             105.0|                 105.0|             105.0|                    105.0|
+|1970-01-01T08:00:00.015+08:00|             115.0|                 115.0|             115.0|                    115.0|
+|1970-01-01T08:00:00.025+08:00|             125.0|                 125.0|             125.0|                    125.0|
+|1970-01-01T08:00:00.035+08:00|             135.0|                 135.0|             135.0|                    135.0|
+|1970-01-01T08:00:00.045+08:00|             145.0|                 145.0|             145.0|                    145.0|
+|1970-01-01T08:00:00.055+08:00|             155.0|                 155.0|             155.0|                    155.0|
+|1970-01-01T08:00:00.065+08:00|             165.0|                 165.0|             165.0|                    165.0|
+|1970-01-01T08:00:00.075+08:00|             175.0|                 175.0|             175.0|                    175.0|
+|1970-01-01T08:00:00.085+08:00|             185.0|                 185.0|             185.0|                    185.0|
+|1970-01-01T08:00:00.095+08:00|             195.0|                 195.0|             195.0|                    195.0|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+Total line number = 10
+It costs 0.041s
+```
 
 ### User Defined Timeseries Generating Functions
 
diff --git a/docs/zh/UserGuide/Query-Data/Select-Expression.md b/docs/zh/UserGuide/Query-Data/Select-Expression.md
index 316fe113bd..d96589c761 100644
--- a/docs/zh/UserGuide/Query-Data/Select-Expression.md
+++ b/docs/zh/UserGuide/Query-Data/Select-Expression.md
@@ -675,6 +675,82 @@ Total line number = 12
 It costs 0.065s
 ```
 
+#### 等数量分桶离群值采样
+本函数对输入序列进行等数量分桶离群值采样,即根据用户给定的降采样比例和桶内采样个数将输入序列按固定点数等分为若干桶,在每个桶内通过给定的离群值采样方法进行采样。
+
+| 函数名      | 可接收的输入序列类型                     | 必要的属性参数                               | 输出序列类型     | 功能类型                                             |
+|----------|--------------------------------|---------------------------------------|------------|--------------------------------------------------|
+| EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE   | INT32 / INT64 / FLOAT / DOUBLE | `proportion`取值范围为`(0, 1]`,默认为`0.1`</br>`type`取值为`avg`或`stendis`或`cos`或`prenextdis`,默认为`avg`</br>`number`取值应大于0,默认`3`| INT32 / INT64 / FLOAT / DOUBLE | 返回符合采样比例和桶内采样个数的等分桶离群值采样                |
+
+参数说明
+- `proportion`: 采样比例
+  - `number`: 每个桶内的采样个数,默认`3`
+- `type`: 离群值采样方法,取值为
+    - `avg`: 取桶内数据点的平均值,并根据采样比例,找到距离均值最远的`top number`个
+    - `stendis`: 取桶内每一个数据点距离桶的首末数据点连成直线的垂直距离,并根据采样比例,找到距离最大的`top number`个
+    - `cos`: 设桶内一个数据点为b,b左边的数据点为a,b右边的数据点为c,则取ab与bc向量的夹角的余弦值,值越小,说明形成的角度越大,越可能是异常值。找到cos值最小的`top number`个
+    - `prenextdis`: 设桶内一个数据点为b,b左边的数据点为a,b右边的数据点为c,则取ab与bc的长度之和作为衡量标准,和越大越可能是异常值,找到最大的`top number`个
+
+##### 演示
+测试数据:`root.ln.wf01.wt01.temperature`从`0.0-99.0`共`100`条数据,其中为了加入离群值,我们使得个位数为5的值自增100。
+```
+IoTDB> select temperature from root.ln.wf01.wt01;
++-----------------------------+-----------------------------+
+|                         Time|root.ln.wf01.wt01.temperature|
++-----------------------------+-----------------------------+
+|1970-01-01T08:00:00.000+08:00|                          0.0|
+|1970-01-01T08:00:00.001+08:00|                          1.0|
+|1970-01-01T08:00:00.002+08:00|                          2.0|
+|1970-01-01T08:00:00.003+08:00|                          3.0|
+|1970-01-01T08:00:00.004+08:00|                          4.0|
+|1970-01-01T08:00:00.005+08:00|                        105.0|
+|1970-01-01T08:00:00.006+08:00|                          6.0|
+|1970-01-01T08:00:00.007+08:00|                          7.0|
+|1970-01-01T08:00:00.008+08:00|                          8.0|
+|1970-01-01T08:00:00.009+08:00|                          9.0|
+|1970-01-01T08:00:00.010+08:00|                         10.0|
+|1970-01-01T08:00:00.011+08:00|                         11.0|
+|1970-01-01T08:00:00.012+08:00|                         12.0|
+|1970-01-01T08:00:00.013+08:00|                         13.0|
+|1970-01-01T08:00:00.014+08:00|                         14.0|
+|1970-01-01T08:00:00.015+08:00|                        115.0|
+|1970-01-01T08:00:00.016+08:00|                         16.0|
+|.............................|.............................|
+|1970-01-01T08:00:00.092+08:00|                         92.0|
+|1970-01-01T08:00:00.093+08:00|                         93.0|
+|1970-01-01T08:00:00.094+08:00|                         94.0|
+|1970-01-01T08:00:00.095+08:00|                        195.0|
+|1970-01-01T08:00:00.096+08:00|                         96.0|
+|1970-01-01T08:00:00.097+08:00|                         97.0|
+|1970-01-01T08:00:00.098+08:00|                         98.0|
+|1970-01-01T08:00:00.099+08:00|                         99.0|
++-----------------------------+-----------------------------+
+```
+sql:
+```sql
+select equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='avg', 'number'='2') as outlier_avg_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='stendis', 'number'='2') as outlier_stendis_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='cos', 'number'='2') as outlier_cos_sample, equal_size_bucket_outlier_sample(temperature, 'proportion'='0.1', 'type'='prenextdis', 'number'='2') as outlier_prenextdis_sam [...]
+```
+结果:
+```
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|                         Time|outlier_avg_sample|outlier_stendis_sample|outlier_cos_sample|outlier_prenextdis_sample|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+|1970-01-01T08:00:00.005+08:00|             105.0|                 105.0|             105.0|                    105.0|
+|1970-01-01T08:00:00.015+08:00|             115.0|                 115.0|             115.0|                    115.0|
+|1970-01-01T08:00:00.025+08:00|             125.0|                 125.0|             125.0|                    125.0|
+|1970-01-01T08:00:00.035+08:00|             135.0|                 135.0|             135.0|                    135.0|
+|1970-01-01T08:00:00.045+08:00|             145.0|                 145.0|             145.0|                    145.0|
+|1970-01-01T08:00:00.055+08:00|             155.0|                 155.0|             155.0|                    155.0|
+|1970-01-01T08:00:00.065+08:00|             165.0|                 165.0|             165.0|                    165.0|
+|1970-01-01T08:00:00.075+08:00|             175.0|                 175.0|             175.0|                    175.0|
+|1970-01-01T08:00:00.085+08:00|             185.0|                 185.0|             185.0|                    185.0|
+|1970-01-01T08:00:00.095+08:00|             195.0|                 195.0|             195.0|                    195.0|
++-----------------------------+------------------+----------------------+------------------+-------------------------+
+Total line number = 10
+It costs 0.041s
+```
+
+
 ### 自定义时间序列生成函数
 
 请参考 [UDF](../Process-Data/UDF-User-Defined-Function.md)。
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
index bf75cdc5f0..b6349c914f 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFBuiltinFunctionIT.java
@@ -767,4 +767,142 @@ public class IoTDBUDTFBuiltinFunctionIT {
       e.printStackTrace();
     }
   }
+
+  @Test
+  public void testEqualBucketSampleForOutlier() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TIMESERIES root.sg.d6.s1 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d6.s2 with datatype=INT64,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d6.s3 with datatype=DOUBLE,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d6.s4 with datatype=FLOAT,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg.d6.s5 with datatype=FLOAT,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+    String[] SQL_FOR_SAMPLE_S1 = new String[100];
+    String[] SQL_FOR_SAMPLE_S2 = new String[100];
+    String[] SQL_FOR_SAMPLE_S3 = new String[100];
+    String[] SQL_FOR_SAMPLE_S4 = new String[100];
+    String[] SQL_FOR_SAMPLE_S5 = new String[20];
+    for (int i = 0; i < 100; i++) {
+      SQL_FOR_SAMPLE_S1[i] =
+          String.format(
+              "insert into root.sg.d6(time, s1) values (%d, %d)",
+              i, i % 5 == 0 && i % 10 != 0 ? i + 100 : i);
+      SQL_FOR_SAMPLE_S2[i] =
+          String.format(
+              "insert into root.sg.d6(time, s2) values (%d, %d)", i, i % 10 == 6 ? i + 100 : i);
+      if (i % 10 == 9 || i % 20 == 0) {
+        SQL_FOR_SAMPLE_S2[i] = String.format("insert into root.sg.d6(time, s2) values (%d, 0)", i);
+      }
+      SQL_FOR_SAMPLE_S3[i] =
+          String.format(
+              "insert into root.sg.d6(time, s3) values (%d, %d)", i, i % 10 == 7 ? i + 100 : i);
+      SQL_FOR_SAMPLE_S4[i] =
+          String.format(
+              "insert into root.sg.d6(time, s4) values (%d, %d)", i, i % 10 == 8 ? i + 100 : i);
+    }
+    for (int i = 0; i < 20; i++) {
+      SQL_FOR_SAMPLE_S5[i] =
+          String.format("insert into root.sg.d6(time, s5) values (%d, %d)", i, i);
+    }
+    int[] ANSWER1 = new int[] {105, 115, 125, 135, 145, 155, 165, 175, 185, 195};
+    long[] ANSWER2 = new long[] {106, 116, 126, 136, 146, 156, 166, 176, 186, 196};
+    double[] ANSWER3 = new double[] {107, 117, 127, 137, 147, 157, 167, 177, 187, 197};
+    float[] ANSWER4 = new float[] {108, 118, 128, 138, 148, 158, 168, 178, 188, 198};
+    float[] ANSWER5 = new float[] {0, 2, 4, 6, 8, 10, 12, 14, 16, 18};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < 100; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S1[i]);
+        statement.execute(SQL_FOR_SAMPLE_S2[i]);
+        statement.execute(SQL_FOR_SAMPLE_S3[i]);
+        statement.execute(SQL_FOR_SAMPLE_S4[i]);
+      }
+      for (int i = 0; i < 20; i++) {
+        statement.execute(SQL_FOR_SAMPLE_S5[i]);
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      String functionName = "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE";
+      double proportionValue = 0.1;
+
+      ResultSet resultSet0 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s1, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.sg.d6",
+                  functionName, proportionValue, "avg", 2));
+      int columnCount0 = resultSet0.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount0);
+      for (int i = 0; i < 10; i++) {
+        resultSet0.next();
+        assertEquals(ANSWER1[i], resultSet0.getInt(2));
+      }
+
+      ResultSet resultSet1 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s2, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.sg.d6",
+                  functionName, proportionValue, "stendis", 2));
+      int columnCount1 = resultSet1.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount1);
+      for (int i = 0; i < 10; i++) {
+        resultSet1.next();
+        assertEquals(ANSWER2[i], resultSet1.getLong(2));
+      }
+
+      ResultSet resultSet2 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s3, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.sg.d6",
+                  functionName, proportionValue, "cos", 2));
+      int columnCount2 = resultSet2.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount2);
+      for (int i = 0; i < 10; i++) {
+        resultSet2.next();
+        assertEquals(ANSWER3[i], resultSet2.getDouble(2), 0.01);
+      }
+
+      ResultSet resultSet3 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s4, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.sg.d6",
+                  functionName, proportionValue, "prenextdis", 2));
+      int columnCount3 = resultSet3.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount3);
+      for (int i = 0; i < 10; i++) {
+        resultSet3.next();
+        assertEquals(ANSWER4[i], resultSet3.getFloat(2), 0.01);
+      }
+
+      ResultSet resultSet4 =
+          statement.executeQuery(
+              String.format(
+                  "select "
+                      + "%s(s5, 'proportion'='%f', 'type'='%s', 'number'='%d') "
+                      + "from root.sg.d6",
+                  functionName, 0.5, "cos", 1));
+      int columnCount4 = resultSet4.getMetaData().getColumnCount();
+      assertEquals(1 + 1, columnCount4);
+      for (int i = 0; i < 10; i++) {
+        resultSet4.next();
+        assertEquals(ANSWER5[i], resultSet4.getFloat(2), 0.01);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
index e54e86a821..999e0ea923 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/BuiltinFunction.java
@@ -63,7 +63,9 @@ public enum BuiltinFunction {
   EQUAL_SIZE_BUCKET_RANDOM_SAMPLE(
       "EQUAL_SIZE_BUCKET_RANDOM_SAMPLE", UDTFEqualSizeBucketRandomSample.class),
   EQUAL_SIZE_BUCKET_AGG_SAMPLE("EQUAL_SIZE_BUCKET_AGG_SAMPLE", UDTFEqualSizeBucketAggSample.class),
-  EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class);
+  EQUAL_SIZE_BUCKET_M4_SAMPLE("EQUAL_SIZE_BUCKET_M4_SAMPLE", UDTFEqualSizeBucketM4Sample.class),
+  EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE(
+      "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", UDTFEqualSizeBucketOutlierSample.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
index ee6b0c59c0..b4b7b2c1c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketM4Sample.java
@@ -65,7 +65,7 @@ public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
   }
 
   public void transformInt(RowWindow rowWindow, PointCollector collector) throws IOException {
-    if (rowWindow.windowSize() < 4) {
+    if (rowWindow.windowSize() <= 4) {
       for (int i = 0; i < rowWindow.windowSize(); i++) {
         Row row = rowWindow.getRow(i);
         collector.putInt(row.getTime(), row.getInt(0));
@@ -108,7 +108,7 @@ public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
   }
 
   public void transformLong(RowWindow rowWindow, PointCollector collector) throws IOException {
-    if (rowWindow.windowSize() < 4) {
+    if (rowWindow.windowSize() <= 4) {
       for (int i = 0; i < rowWindow.windowSize(); i++) {
         Row row = rowWindow.getRow(i);
         collector.putLong(row.getTime(), row.getLong(0));
@@ -151,7 +151,7 @@ public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
   }
 
   public void transformFloat(RowWindow rowWindow, PointCollector collector) throws IOException {
-    if (rowWindow.windowSize() < 4) {
+    if (rowWindow.windowSize() <= 4) {
       for (int i = 0; i < rowWindow.windowSize(); i++) {
         Row row = rowWindow.getRow(i);
         collector.putFloat(row.getTime(), row.getFloat(0));
@@ -194,7 +194,7 @@ public class UDTFEqualSizeBucketM4Sample extends UDTFEqualSizeBucketSample {
   }
 
   public void transformDouble(RowWindow rowWindow, PointCollector collector) throws IOException {
-    if (rowWindow.windowSize() < 4) {
+    if (rowWindow.windowSize() <= 4) {
       for (int i = 0; i < rowWindow.windowSize(); i++) {
         Row row = rowWindow.getRow(i);
         collector.putDouble(row.getTime(), row.getDouble(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java
new file mode 100644
index 0000000000..60a6658b58
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/builtin/UDTFEqualSizeBucketOutlierSample.java
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.builtin;
+
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.exception.UDFException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFInputSeriesDataTypeNotValidException;
+import org.apache.iotdb.db.query.udf.api.exception.UDFParameterNotValidException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class UDTFEqualSizeBucketOutlierSample extends UDTFEqualSizeBucketSample {
+
+  private String type;
+  private int number;
+  private OutlierSampler outlierSampler;
+
+  private interface OutlierSampler {
+
+    void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleLong(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleFloat(RowWindow rowWindow, PointCollector collector) throws IOException;
+
+    void outlierSampleDouble(RowWindow rowWindow, PointCollector collector) throws IOException;
+  }
+
+  private class AvgOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putInt(row.getTime(), row.getInt(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getInt(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getInt(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putLong(row.getTime(), row.getLong(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getLong(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getLong(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putFloat(row.getTime(), row.getFloat(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getFloat(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getFloat(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (windowSize <= number) {
+        for (int i = 0; i < windowSize; i++) {
+          Row row = rowWindow.getRow(i);
+          collector.putDouble(row.getTime(), row.getDouble(0));
+        }
+        return;
+      }
+
+      double avg = 0;
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+      for (int i = 0; i < windowSize; i++) {
+        avg += rowWindow.getRow(i).getDouble(0);
+      }
+      avg /= windowSize;
+      for (int i = 0; i < windowSize; i++) {
+        double value = Math.abs(rowWindow.getRow(i).getDouble(0) - avg);
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class StendisOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      int row0y = rowWindow.getRow(0).getInt(0), row1y = rowWindow.getRow(windowSize - 1).getInt(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = row0y - row1y;
+      double B = row1x - row0x;
+      double C = row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getInt(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      long row0y = rowWindow.getRow(0).getLong(0),
+          row1y = rowWindow.getRow(windowSize - 1).getLong(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = row0y - row1y;
+      double B = row1x - row0x;
+      double C = row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getLong(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      float row0y = rowWindow.getRow(0).getFloat(0),
+          row1y = rowWindow.getRow(windowSize - 1).getFloat(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = row0y - row1y;
+      double B = row1x - row0x;
+      double C = row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getFloat(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      long row0x = rowWindow.getRow(0).getTime(),
+          row1x = rowWindow.getRow(windowSize - 1).getTime();
+      double row0y = rowWindow.getRow(0).getDouble(0),
+          row1y = rowWindow.getRow(windowSize - 1).getDouble(0);
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      double A = row0y - row1y;
+      double B = row1x - row0x;
+      double C = row0x * row1y - row1x * row0y;
+      double denominator = Math.sqrt(A * A + B * B);
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        double value = Math.abs(A * row.getTime() + B * row.getDouble(0) + C) / denominator;
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class CosOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      int lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getInt(0);
+        currentValue = rowWindow.getRow(i).getInt(0);
+        nextValue = rowWindow.getRow(i + 1).getInt(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value =
+            (x1 * x2 + y1 * y2)
+                / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      long lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getLong(0);
+        currentValue = rowWindow.getRow(i).getLong(0);
+        nextValue = rowWindow.getRow(i + 1).getLong(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value =
+            (x1 * x2 + y1 * y2)
+                / (Math.sqrt((double) x1 * x1 + y1 * y1) * Math.sqrt((double) x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      float lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getFloat(0);
+        currentValue = rowWindow.getRow(i).getFloat(0);
+        nextValue = rowWindow.getRow(i + 1).getFloat(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      // o -> -o.right, max heap
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> -o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      double lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getDouble(0);
+        currentValue = rowWindow.getRow(i).getDouble(0);
+        nextValue = rowWindow.getRow(i + 1).getDouble(0);
+
+        x1 = currentTime - lastTime;
+        x2 = nextTime - currentTime;
+        y1 = currentValue - lastValue;
+        y2 = nextValue - currentValue;
+
+        value = (x1 * x2 + y1 * y2) / (Math.sqrt(x1 * x1 + y1 * y1) * Math.sqrt(x2 * x2 + y2 * y2));
+
+        addToMaxHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  private class PrenextdisOutlierSampler implements OutlierSampler {
+
+    @Override
+    public void outlierSampleInt(RowWindow rowWindow, PointCollector collector) throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallInt(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      int lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getInt(0);
+        currentValue = rowWindow.getRow(i).getInt(0);
+        nextValue = rowWindow.getRow(i + 1).getInt(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueInt(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleLong(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallLong(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      long lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getLong(0);
+        currentValue = rowWindow.getRow(i).getLong(0);
+        nextValue = rowWindow.getRow(i + 1).getLong(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueLong(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleFloat(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallFloat(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      float lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getFloat(0);
+        currentValue = rowWindow.getRow(i).getFloat(0);
+        nextValue = rowWindow.getRow(i + 1).getFloat(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueFloat(pq, rowWindow, collector);
+    }
+
+    @Override
+    public void outlierSampleDouble(RowWindow rowWindow, PointCollector collector)
+        throws IOException {
+      int windowSize = rowWindow.windowSize();
+      if (isWindowSizeTooSmallDouble(rowWindow, collector, windowSize)) {
+        return;
+      }
+
+      PriorityQueue<Pair<Integer, Double>> pq =
+          new PriorityQueue<>(number, Comparator.comparing(o -> o.right));
+
+      long lastTime, currentTime, nextTime, x1, x2;
+      double lastValue, currentValue, nextValue, y1, y2;
+      double value;
+
+      for (int i = 1; i < windowSize - 1; i++) {
+        lastTime = rowWindow.getRow(i - 1).getTime();
+        currentTime = rowWindow.getRow(i).getTime();
+        nextTime = rowWindow.getRow(i + 1).getTime();
+
+        lastValue = rowWindow.getRow(i - 1).getDouble(0);
+        currentValue = rowWindow.getRow(i).getDouble(0);
+        nextValue = rowWindow.getRow(i + 1).getDouble(0);
+
+        x1 = Math.abs(currentTime - lastTime);
+        x2 = Math.abs(nextTime - currentTime);
+        y1 = Math.abs(currentValue - lastValue);
+        y2 = Math.abs(nextValue - currentValue);
+
+        value = x1 + y1 + x2 + y2;
+
+        addToMinHeap(pq, i, value);
+      }
+
+      putPQValueDouble(pq, rowWindow, collector);
+    }
+  }
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws UDFException, MetadataException {
+    super.validate(validator);
+    type = validator.getParameters().getStringOrDefault("type", "avg").toLowerCase();
+    number = validator.getParameters().getIntOrDefault("number", 3);
+    validator
+        .validate(
+            type ->
+                "avg".equals(type)
+                    || "stendis".equals(type)
+                    || "cos".equals(type)
+                    || "prenextdis".equals(type),
+            "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis.",
+            type)
+        .validate(
+            number -> (int) number > 0, "Illegal number. Number should be greater than 0.", number);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    bucketSize *= number;
+    configurations
+        .setAccessStrategy(new SlidingSizeWindowAccessStrategy(bucketSize))
+        .setOutputDataType(dataType);
+    switch (type) {
+      case "avg":
+        outlierSampler = new AvgOutlierSampler();
+        break;
+      case "stendis":
+        outlierSampler = new StendisOutlierSampler();
+        break;
+      case "cos":
+        outlierSampler = new CosOutlierSampler();
+        break;
+      case "prenextdis":
+        outlierSampler = new PrenextdisOutlierSampler();
+        break;
+      default:
+        throw new UDFParameterNotValidException(
+            "Illegal outlier method. Outlier type should be avg, stendis, cos or prenextdis.");
+    }
+  }
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector)
+      throws IOException, UDFParameterNotValidException {
+    switch (dataType) {
+      case INT32:
+        outlierSampler.outlierSampleInt(rowWindow, collector);
+        break;
+      case INT64:
+        outlierSampler.outlierSampleLong(rowWindow, collector);
+        break;
+      case FLOAT:
+        outlierSampler.outlierSampleFloat(rowWindow, collector);
+        break;
+      case DOUBLE:
+        outlierSampler.outlierSampleDouble(rowWindow, collector);
+        break;
+      default:
+        // This will not happen
+        throw new UDFInputSeriesDataTypeNotValidException(
+            0, dataType, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE);
+    }
+  }
+
+  public void addToMinHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) {
+    if (pq.size() < number) {
+      pq.add(new Pair<>(i, value));
+    } else if (value > pq.peek().right) {
+      pq.poll();
+      pq.add(new Pair<>(i, value));
+    }
+  }
+
+  public void addToMaxHeap(PriorityQueue<Pair<Integer, Double>> pq, int i, double value) {
+    if (pq.size() < number) {
+      pq.add(new Pair<>(i, value));
+    } else if (value < pq.peek().right) {
+      pq.poll();
+      pq.add(new Pair<>(i, value));
+    }
+  }
+
+  public void putPQValueInt(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putInt(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getInt(0));
+    }
+  }
+
+  public void putPQValueLong(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putLong(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getLong(0));
+    }
+  }
+
+  public void putPQValueFloat(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putFloat(rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getFloat(0));
+    }
+  }
+
+  public void putPQValueDouble(
+      PriorityQueue<Pair<Integer, Double>> pq, RowWindow rowWindow, PointCollector collector)
+      throws IOException {
+    int[] arr = new int[number];
+    for (int i = 0; i < number; i++) {
+      arr[i] = pq.peek().left;
+      pq.poll();
+    }
+    Arrays.sort(arr);
+    for (int i = 0; i < number; i++) {
+      collector.putDouble(
+          rowWindow.getRow(arr[i]).getTime(), rowWindow.getRow(arr[i]).getDouble(0));
+    }
+  }
+
+  public boolean isWindowSizeTooSmallInt(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putInt(row.getTime(), row.getInt(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallLong(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putLong(row.getTime(), row.getLong(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallFloat(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putFloat(row.getTime(), row.getFloat(0));
+      }
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isWindowSizeTooSmallDouble(
+      RowWindow rowWindow, PointCollector collector, int windowSize) throws IOException {
+    if (windowSize <= number) {
+      for (int i = 0; i < windowSize; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    } else if (windowSize == number + 1) {
+      for (int i = 0; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    } else if (windowSize == number + 2) {
+      for (int i = 1; i < windowSize - 1; i++) {
+        Row row = rowWindow.getRow(i);
+        collector.putDouble(row.getTime(), row.getDouble(0));
+      }
+      return true;
+    }
+    return false;
+  }
+}